Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • 1048-service-config-backfilling
  • NAT-1154-import-edge-port-update
  • develop
  • feature/10GGBS-NAT-980
  • feature/NAT-1150-model-commecial-peers
  • feature/NAT-1182-rename-geant-plus-descriptions
  • feature/NAT-732-ias-to-re-interconnect
  • feature/add-moodi-wf-to-router
  • feature/mass-base-config-redeploy
  • feature/nat-1211-edgeport-lacp-xmit
  • feature/rename-geant-plus-descriptions
  • fix/NAT-1009/fix-redeploy-base-config-if-there-is-a-vprn
  • fix/l3-imports
  • fix/nat-1120-sdp-validation
  • master
  • update_change_log
  • 0.1
  • 0.2
  • 0.3
  • 0.4
  • 0.5
  • 0.6
  • 0.7
  • 0.8
  • 0.9
  • 1.0
  • 1.1
  • 1.4
  • 1.5
  • 2.0
  • 2.1
  • 2.10
  • 2.11
  • 2.12
  • 2.13
  • 2.14
  • 2.15
  • 2.16
  • 2.17
  • 2.18
  • 2.19
  • 2.2
  • 2.20
  • 2.21
  • 2.22
  • 2.23
  • 2.24
  • 2.25
  • 2.26
  • 2.27
  • 2.28
  • 2.29
  • 2.3
  • 2.31
  • 2.32
  • 2.33
  • 2.34
  • 2.35
  • 2.36
  • 2.37
  • 2.38
  • 2.39
  • 2.4
  • 2.40
  • 2.41
  • 2.42
  • 2.43
  • 2.44
  • 2.45
  • 2.46
  • 2.47
  • 2.48
  • 2.5
  • 2.6
  • 2.7
  • 2.8
  • 2.9
  • 3.0
  • 3.1
  • 3.2
  • 3.3
  • 3.4
  • 3.5
  • 3.6
  • 3.7
  • 3.8
  • Lime-Seal
87 results

Target

Select target project
  • goat/gap/geant-service-orchestrator
1 result
Select Git revision
  • 1048-service-config-backfilling
  • NAT-1154-import-edge-port-update
  • develop
  • feature/10GGBS-NAT-980
  • feature/NAT-1150-model-commecial-peers
  • feature/NAT-1182-rename-geant-plus-descriptions
  • feature/NAT-732-ias-to-re-interconnect
  • feature/add-moodi-wf-to-router
  • feature/mass-base-config-redeploy
  • feature/nat-1211-edgeport-lacp-xmit
  • feature/rename-geant-plus-descriptions
  • fix/NAT-1009/fix-redeploy-base-config-if-there-is-a-vprn
  • fix/l3-imports
  • fix/nat-1120-sdp-validation
  • master
  • update_change_log
  • 0.1
  • 0.2
  • 0.3
  • 0.4
  • 0.5
  • 0.6
  • 0.7
  • 0.8
  • 0.9
  • 1.0
  • 1.1
  • 1.4
  • 1.5
  • 2.0
  • 2.1
  • 2.10
  • 2.11
  • 2.12
  • 2.13
  • 2.14
  • 2.15
  • 2.16
  • 2.17
  • 2.18
  • 2.19
  • 2.2
  • 2.20
  • 2.21
  • 2.22
  • 2.23
  • 2.24
  • 2.25
  • 2.26
  • 2.27
  • 2.28
  • 2.29
  • 2.3
  • 2.31
  • 2.32
  • 2.33
  • 2.34
  • 2.35
  • 2.36
  • 2.37
  • 2.38
  • 2.39
  • 2.4
  • 2.40
  • 2.41
  • 2.42
  • 2.43
  • 2.44
  • 2.45
  • 2.46
  • 2.47
  • 2.48
  • 2.5
  • 2.6
  • 2.7
  • 2.8
  • 2.9
  • 3.0
  • 3.1
  • 3.2
  • 3.3
  • 3.4
  • 3.5
  • 3.6
  • 3.7
  • 3.8
  • Lime-Seal
87 results
Show changes
Commits on Source (9)
Showing with 237 additions and 57 deletions
......@@ -9,6 +9,7 @@ oss-params.json
.pytest_cache
.ruff_cache
.tox
build/
# Documentation
docs/build
......
......@@ -3,6 +3,7 @@ stages:
- tox
- documentation
- sonarqube
- trigger_jenkins_build
include:
- docs/.gitlab-ci.yml
......@@ -55,3 +56,16 @@ sonarqube:
- sonar-scanner -Dsonar.login=$SONAR_TOKEN -Dproject.settings=./sonar.properties
tags:
- docker-executor
trigger_jenkins_build:
stage: trigger_jenkins_build
image: alpine:latest
before_script:
- apk add --no-cache curl
script: curl -u "$JENKINS_USERNAME:$JENKINS_API_TOKEN"
--data "PROJECT=geant-service-orchestrator"
"http://test-swd-release-service01.geant.org:8080/job/build-python-snapshot/buildWithParameters"
only:
- develop
tags:
- docker-executor
......@@ -15,9 +15,12 @@ RUN pip install \
# Set the environment variable for the translations directory
ENV TRANSLATIONS_DIR=/app/gso/translations/
COPY --chmod=755 entrypoint.sh /app/entrypoint.sh
# Copy the shell scripts and ensure scripts do not have Windows line endings and make them executable
COPY start-app.sh start-worker.sh start-scheduler.sh /app/
RUN sed -i 's/\r$//' start-app.sh start-worker.sh start-scheduler.sh && \
chmod 755 start-app.sh start-worker.sh start-scheduler.sh
RUN chown -R appuser:appgroup /app
USER appuser
EXPOSE 8080
ENTRYPOINT ["/app/entrypoint.sh"]
ENTRYPOINT ["/app/start-app.sh"]
// https://gitlab.geant.net/live-projects/jenkins-pipeline/-/tree/master/vars
library 'SWDPipeline'
// Parameters:
// name (must match the name of the project in GitLab/SWD release jenkins)
String name = 'geant-service-orchestrator'
// emails of people to always notify on build status changes
List<String> extraRecipients = ['erik.reid@geant.org']
// python versions (docker tags) to test against, must be explicit versions
List<String> pythonTestVersions = ['3.11']
// Environment variables you want to pass
Map<String, String> appEnvironmentVariables = [
'SKIP_ALL_TESTS': '1',
// add more as needed
]
SimplePythonBuild(name, extraRecipients, pythonTestVersions, appEnvironmentVariables)
from typer import Typer
import typer
from orchestrator import OrchestratorCore, app_settings
from orchestrator.cli.main import app as cli_app
import gso.products # noqa: F401
import gso.workflows # noqa: F401
from gso.api import router as api_router
from gso.cli import netbox
def load_gso_cli(app: Typer) -> None:
def init_gso_app() -> OrchestratorCore:
app = OrchestratorCore(base_settings=app_settings)
app.include_router(api_router, prefix="/api")
return app
def init_worker_app() -> OrchestratorCore:
return OrchestratorCore(base_settings=app_settings)
def init_cli_app() -> typer.Typer:
from gso.cli import import_sites
app.add_typer(import_sites.app, name="import_sites")
app.add_typer(netbox.app, name="netbox-cli")
cli_app.add_typer(import_sites.app, name="import_sites")
cli_app.add_typer(netbox.app, name="netbox-cli")
return cli_app()
"""The main module that runs :term:`GSO`."""
import typer
from orchestrator import OrchestratorCore
from orchestrator.cli.main import app as core_cli
from orchestrator.settings import AppSettings
import gso.products # noqa: F401
import gso.workflows # noqa: F401
from gso import load_gso_cli
from gso.api import router as api_router
from gso import init_cli_app, init_gso_app
def init_gso_app(settings: AppSettings) -> OrchestratorCore:
app = OrchestratorCore(base_settings=settings)
app.include_router(api_router, prefix="/api")
return app
def init_cli_app() -> typer.Typer:
load_gso_cli(core_cli)
return core_cli()
app = init_gso_app(settings=AppSettings())
app = init_gso_app()
if __name__ == "__main__":
init_cli_app()
......@@ -49,5 +49,10 @@
"scheme": "https",
"api_base": "localhost:44444",
"api_version": 1123
},
"CELERY": {
"broker_url": "redis://localhost:6379/0",
"result_backend": "rpc://localhost:6379/0",
"result_expires": 3600
}
}
import inspect
from functools import wraps
from typing import Any, Callable
from celery import current_app
from celery.schedules import crontab
def scheduler(
name: str,
minute: str = "*",
hour: str = "*",
day_of_week: str = "*",
day_of_month: str = "*",
month_of_year: str = "*",
) -> Callable[[Callable], Callable]:
"""Schedule a Celery task using crontab-like timing.
Examples
--------
- `minute='*/15'`: Run every 15 minutes.
- `hour='*/3'`: Run every 3 hours.
- `day_of_week='mon-fri'`: Run on weekdays only.
- `day_of_month='1-7,15-21'`: Run on the first and third weeks of the month.
- `month_of_year='*/3'`: Run on the first month of each quarter.
All time units can be specified with lists of numbers or crontab pattern strings for advanced scheduling.
All specified time parts (minute, hour, day, etc.) must align for a task to run.
"""
def decorator(task_func: Callable) -> Callable:
@wraps(task_func)
def scheduled_task(*args: Any, **kwargs: Any) -> Any:
return task_func(*args, **kwargs)
module = inspect.getmodule(task_func)
if module is None:
raise ValueError(f"Module for the task function {task_func.__name__} could not be found.")
task_path = f"{module.__name__}.{task_func.__name__}"
current_app.conf.beat_schedule[task_func.__name__] = {
"name": name,
"task": task_path,
"schedule": crontab(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week,
),
}
return scheduled_task
return decorator
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.worker import celery
@celery.task
@scheduler(name="Clean up tasks", hour="*/6")
def vacuum_tasks() -> None:
start_process("task_clean_up_tasks")
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.services.subscriptions import count_incomplete_validate_products
from gso.worker import celery
@celery.task
@scheduler(name="Validate Products and inactive subscriptions", minute="30", hour="2")
def validate_products() -> None:
if count_incomplete_validate_products() > 0:
start_process("task_validate_products")
import structlog
from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP
from orchestrator.targets import Target
from gso.schedules.scheduling import scheduler
from gso.services.subscriptions import get_insync_subscriptions
from gso.worker import celery
logger = structlog.get_logger(__name__)
@celery.task
@scheduler(name="Subscriptions Validator", minute="10", hour="0")
def validate_subscriptions() -> None:
for subscription in get_insync_subscriptions():
validation_workflow = None
for workflow in subscription.product.workflows:
if workflow.target == Target.SYSTEM:
validation_workflow = workflow.name
if validation_workflow:
default = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]
usable_when = WF_USABLE_MAP.get(validation_workflow, default)
if subscription.status in usable_when:
json = [{"subscription_id": str(subscription.subscription_id)}]
validate_func = get_execution_context()["validate"]
validate_func(validation_workflow, json=json)
else:
logger.warning(
"SubscriptionTable has no validation workflow",
subscription=subscription,
product=subscription.product.name,
)
......@@ -2,13 +2,13 @@ from typing import Any
from uuid import UUID
from orchestrator.db import (
ProcessTable,
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
)
from orchestrator.graphql.schemas.subscription import Subscription
from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductType
......@@ -87,7 +87,7 @@ def get_product_id_by_name(product_name: ProductType) -> UUID:
return ProductTable.query.filter_by(name=product_name).first().product_id
def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[Subscription]:
def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]:
"""Retrieve a list of active subscriptions based on a specified field and its value.
:param field_name: The name of the field to filter by.
......@@ -97,7 +97,7 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st
:type field_value: Any
:return: A list of active Subscription objects that match the criteria.
:rtype: List[Subscription]
:rtype: List[SubscriptionTable]
"""
return (
SubscriptionTable.query.join(ProductTable)
......@@ -109,3 +109,21 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st
.filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)
.all()
)
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_products processes.
Returns
-------
int
The count of incomplete 'validate_products' processes.
"""
return ProcessTable.query.filter(
ProcessTable.workflow_name == "validate_products", ProcessTable.last_status != "completed"
).count()
def get_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently in sync."""
return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()
......@@ -22,6 +22,16 @@ class GeneralParams(BaseSettings):
proxy uses."""
class CeleryParams(BaseSettings):
"""Parameters for Celery."""
broker_url: str
result_backend: str
timezone: str = "Europe/Amsterdam"
enable_utc: bool = True
result_expires: int = 3600
class InfoBloxParams(BaseSettings):
"""Parameters related to InfoBlox."""
......@@ -104,6 +114,7 @@ class OSSParams(BaseSettings):
IPAM: IPAMParams
NETBOX: NetBoxParams
PROVISIONING_PROXY: ProvisioningProxyParams
CELERY: CeleryParams
def load_oss_params() -> OSSParams:
......
from celery import Celery
from gso import init_worker_app
from gso.settings import load_oss_params
class OrchestratorCelery(Celery):
def on_init(self) -> None:
init_worker_app()
settings = load_oss_params()
celery = OrchestratorCelery(
"worker",
broker=settings.CELERY.broker_url,
backend=settings.CELERY.result_backend,
include=[
"gso.schedules.task_vacuum",
"gso.schedules.validate_products",
"gso.schedules.validate_subscriptions",
],
)
celery.conf.update(result_expires=settings.CELERY.result_expires)
celery.conf.update(redbeat_redis_url=settings.CELERY.broker_url)
......@@ -79,7 +79,6 @@ def update_ipam_stub_for_subscription(
) -> State:
subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network
subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
return {"subscription": subscription}
......
......@@ -3,16 +3,18 @@ requests==2.31.0
infoblox-client~=0.6.0
pycountry==22.3.5
pynetbox==7.2.0
celery-redbeat==2.1.1
celery==5.3.4
# Test and linting dependencies
pytest==7.4.2
faker==19.10.0
responses==0.23.3
black==23.9.1
pytest==7.4.3
faker==19.13.0
responses==0.24.0
black==23.10.1
isort==5.12.0
flake8==6.1.0
mypy==1.6.0
ruff==0.0.292
mypy==1.6.1
ruff==0.1.4
sphinx==7.2.6
sphinx-rtd-theme==1.3.0
urllib3_mock==0.3.3
\ No newline at end of file
......@@ -14,6 +14,8 @@ setup(
"infoblox-client~=0.6.0",
"pycountry==22.3.5",
"pynetbox==7.2.0",
"celery-redbeat==2.1.1",
"celery==5.3.4",
],
include_package_data=True,
)
File moved
#!/bin/sh
set -o errexit
set -o nounset
cd /app
celery -A gso.worker beat -l info -S redbeat.RedBeatScheduler