diff --git a/.gitignore b/.gitignore index 792183231acc09723dd827d600ee40da51b6fe72..bd625c22249fbc97ba8eec1a5218c373c1ea0cef 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ oss-params.json .pytest_cache .ruff_cache .tox +build/ # Documentation docs/build diff --git a/Dockerfile b/Dockerfile index 76c3cf21d2843e27c15321c66f0c1711a258f49b..d97b10aeafa1a998850838898df42b2997d5b63e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,9 +15,12 @@ RUN pip install \ # Set the environment variable for the translations directory ENV TRANSLATIONS_DIR=/app/gso/translations/ -COPY --chmod=755 entrypoint.sh /app/entrypoint.sh +# Copy the shell scripts and ensure scripts do not have Windows line endings and make them executable +COPY start-app.sh start-worker.sh start-scheduler.sh /app/ +RUN sed -i 's/\r$//' start-app.sh start-worker.sh start-scheduler.sh && \ + chmod 755 start-app.sh start-worker.sh start-scheduler.sh RUN chown -R appuser:appgroup /app USER appuser EXPOSE 8080 -ENTRYPOINT ["/app/entrypoint.sh"] +ENTRYPOINT ["/app/start-app.sh"] diff --git a/gso/__init__.py b/gso/__init__.py index 072559de81c3b3a25900bbff4fc9c905f95adcc9..d12e0a772b1979e5536ba232fef4892f66bdc7f3 100644 --- a/gso/__init__.py +++ b/gso/__init__.py @@ -1,10 +1,29 @@ -from typer import Typer +import typer +from orchestrator import OrchestratorCore +from orchestrator.cli.main import app as cli_app +from orchestrator.settings import AppSettings +import gso.products # noqa: F401 +import gso.workflows # noqa: F401 +from gso.api import router as api_router from gso.cli import netbox +base_settings = AppSettings() # TODO check if this is correct -def load_gso_cli(app: Typer) -> None: + +def init_gso_app() -> OrchestratorCore: + app = OrchestratorCore(base_settings=base_settings) + app.include_router(api_router, prefix="/api") + return app + + +def init_worker_app() -> OrchestratorCore: + return OrchestratorCore(base_settings=base_settings) + + +def init_cli_app() -> typer.Typer: from gso.cli import import_sites - app.add_typer(import_sites.app, name="import_sites") - app.add_typer(netbox.app, name="netbox-cli") + cli_app.add_typer(import_sites.app, name="import_sites") + cli_app.add_typer(netbox.app, name="netbox-cli") + return cli_app() diff --git a/gso/main.py b/gso/main.py index 112bd535c9df2054cae59225d3c3d16f9c38e242..20098b12ff3007f23ceca4e33d804ad64dfca8b2 100644 --- a/gso/main.py +++ b/gso/main.py @@ -1,27 +1,8 @@ """The main module that runs :term:`GSO`.""" -import typer -from orchestrator import OrchestratorCore -from orchestrator.cli.main import app as core_cli -from orchestrator.settings import AppSettings -import gso.products # noqa: F401 -import gso.workflows # noqa: F401 -from gso import load_gso_cli -from gso.api import router as api_router +from gso import init_cli_app, init_gso_app - -def init_gso_app(settings: AppSettings) -> OrchestratorCore: - app = OrchestratorCore(base_settings=settings) - app.include_router(api_router, prefix="/api") - return app - - -def init_cli_app() -> typer.Typer: - load_gso_cli(core_cli) - return core_cli() - - -app = init_gso_app(settings=AppSettings()) +app = init_gso_app() if __name__ == "__main__": init_cli_app() diff --git a/gso/oss-params-example.json b/gso/oss-params-example.json index 771492f66dd9555588215cf6b7f88edd51b9a14d..4ee622611c12c48be7874eeab7c8ffadac395889 100644 --- a/gso/oss-params-example.json +++ b/gso/oss-params-example.json @@ -49,5 +49,10 @@ "scheme": "https", "api_base": "localhost:44444", "api_version": 1123 + }, + "CELERY": { + "broker_url": "redis://localhost:6379/0", + "result_backend": "rpc://localhost:6379/0", + "result_expires": 3600 } } diff --git a/gso/schedules/__init__.py b/gso/schedules/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/gso/schedules/resume_workflows.py b/gso/schedules/resume_workflows.py new file mode 100644 index 0000000000000000000000000000000000000000..2c168313ffeda2c87d709e4411805971bf29bd34 --- /dev/null +++ b/gso/schedules/resume_workflows.py @@ -0,0 +1,11 @@ +from orchestrator.services.processes import start_process + +from gso.schedules.scheduling import scheduler +from gso.worker import celery + + +@celery.task +@scheduler(name="Resume workflows", hour="*/1") +def run_resume_workflows() -> None: + """Resume all workflows that are stuck on tasks with the status 'waiting'.""" + start_process("task_resume_workflows") diff --git a/gso/schedules/scheduling.py b/gso/schedules/scheduling.py new file mode 100644 index 0000000000000000000000000000000000000000..5400133f0d5d1214793055c12848290fa4b6c5f8 --- /dev/null +++ b/gso/schedules/scheduling.py @@ -0,0 +1,118 @@ +import inspect +from functools import wraps +from typing import Any, Callable + +from celery import current_app +from celery.schedules import crontab + + +def scheduler( + name: str, + minute: str = "*", + hour: str = "*", + day_of_week: str = "*", + day_of_month: str = "*", + month_of_year: str = "*", +) -> Callable[[Callable], Callable]: + """Crontab schedule. + + A Crontab can be used as the ``run_every`` value of a + periodic task entry to add :manpage:`crontab(5)`-like scheduling. + + Like a :manpage:`cron(5)`-job, you can specify units of time of when + you'd like the task to execute. It's a reasonably complete + implementation of :command:`cron`'s features, so it should provide a fair + degree of scheduling needs. + + You can specify a minute, an hour, a day of the week, a day of the + month, and/or a month in the year in any of the following formats: + + .. attribute:: minute + + - A (list of) integers from 0-59 that represent the minutes of + an hour of when execution should occur; or + - A string representing a Crontab pattern. This may get pretty + advanced, like ``minute='*/15'`` (for every quarter) or + ``minute='1,13,30-45,50-59/2'``. + + .. attribute:: hour + + - A (list of) integers from 0-23 that represent the hours of + a day of when execution should occur; or + - A string representing a Crontab pattern. This may get pretty + advanced, like ``hour='*/3'`` (for every three hours) or + ``hour='0,8-17/2'`` (at midnight, and every two hours during + office hours). + + .. attribute:: day_of_week + + - A (list of) integers from 0-6, where Sunday = 0 and Saturday = + 6, that represent the days of a week that execution should + occur. + - A string representing a Crontab pattern. This may get pretty + advanced, like ``day_of_week='mon-fri'`` (for weekdays only). + (Beware that ``day_of_week='*/2'`` does not literally mean + 'every two days', but 'every day that is divisible by two'!) + + .. attribute:: day_of_month + + - A (list of) integers from 1-31 that represents the days of the + month that execution should occur. + - A string representing a Crontab pattern. This may get pretty + advanced, such as ``day_of_month='2-30/2'`` (for every even + numbered day) or ``day_of_month='1-7,15-21'`` (for the first and + third weeks of the month). + + .. attribute:: month_of_year + + - A (list of) integers from 1-12 that represents the months of + the year during which execution can occur. + - A string representing a Crontab pattern. This may get pretty + advanced, such as ``month_of_year='*/3'`` (for the first month + of every quarter) or ``month_of_year='2-12/2'`` (for every even + numbered month). + + .. attribute:: nowfun + + Function returning the current date and time + (:class:`~datetime.datetime`). + + .. attribute:: app + + The Celery app instance. + + It's important to realize that any day on which execution should + occur must be represented by entries in all three of the day and + month attributes. For example, if ``day_of_week`` is 0 and + ``day_of_month`` is every seventh day, only months that begin + on Sunday and are also in the ``month_of_year`` attribute will have + execution events. Or, ``day_of_week`` is 1 and ``day_of_month`` + is '1-7,15-21' means every first and third Monday of every month + present in ``month_of_year``. + """ + + def decorator(task_func: Callable) -> Callable: + @wraps(task_func) + def scheduled_task(*args: Any, **kwargs: Any) -> Any: + return task_func(*args, **kwargs) + + module = inspect.getmodule(task_func) + if module is None: + raise ValueError(f"Module for the task function {task_func.__name__} could not be found.") + + task_path = f"{module.__name__}.{task_func.__name__}" + current_app.conf.beat_schedule[task_func.__name__] = { + "name": name, + "task": task_path, + "schedule": crontab( + minute=minute, + hour=hour, + day_of_month=day_of_month, + month_of_year=month_of_year, + day_of_week=day_of_week, + ), + } + + return scheduled_task + + return decorator diff --git a/gso/schedules/task_vacuum.py b/gso/schedules/task_vacuum.py new file mode 100644 index 0000000000000000000000000000000000000000..ef90479e33d6ffe5d20f8b246aa4597cd57e8b48 --- /dev/null +++ b/gso/schedules/task_vacuum.py @@ -0,0 +1,10 @@ +from orchestrator.services.processes import start_process + +from gso.schedules.scheduling import scheduler +from gso.worker import celery + + +@celery.task +@scheduler(name="Clean up tasks", hour="*/6") +def vacuum_tasks() -> None: + start_process("task_clean_up_tasks") diff --git a/gso/schedules/validate_products.py b/gso/schedules/validate_products.py new file mode 100644 index 0000000000000000000000000000000000000000..4140df8efaf6738fbf8e4375bef8b12befb1398a --- /dev/null +++ b/gso/schedules/validate_products.py @@ -0,0 +1,12 @@ +from orchestrator.services.processes import start_process + +from gso.schedules.scheduling import scheduler +from gso.services.subscriptions import count_incomplete_validate_products +from gso.worker import celery + + +@celery.task +@scheduler(name="Validate Products and inactive subscriptions", minute="30", hour="2") +def validate_products() -> None: + if count_incomplete_validate_products() > 0: + start_process("task_validate_products") diff --git a/gso/schedules/validate_subscriptions.py b/gso/schedules/validate_subscriptions.py new file mode 100644 index 0000000000000000000000000000000000000000..e17ee449c4146296fb91d830f51dd6333016fb11 --- /dev/null +++ b/gso/schedules/validate_subscriptions.py @@ -0,0 +1,37 @@ +import structlog +from orchestrator.services.processes import get_execution_context +from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP +from orchestrator.targets import Target + +from gso.schedules.scheduling import scheduler +from gso.services.subscriptions import get_insync_subscriptions +from gso.worker import celery + +logger = structlog.get_logger(__name__) + + +@celery.task +@scheduler(name="Subscriptions Validator", minute="10", hour="0") +def validate_subscriptions() -> None: + for subscription in get_insync_subscriptions(): + validation_workflow = None + + for workflow in subscription.product.workflows: + if workflow.target == Target.SYSTEM: + validation_workflow = workflow.name + + if validation_workflow: + default = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM] + usable_when = WF_USABLE_MAP.get(validation_workflow, default) + + if subscription.status in usable_when: + json = [{"subscription_id": str(subscription.subscription_id)}] + + validate_func = get_execution_context()["validate"] + validate_func(validation_workflow, json=json) + else: + logger.warning( + "SubscriptionTable has no validation workflow", + subscription=subscription, + product=subscription.product.name, + ) diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index cffa645fc5eb949f9425798e3319c6af10ff50f8..42c57eb244ad20e3c4eaf6dcf27345ce119b9109 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -2,13 +2,13 @@ from typing import Any from uuid import UUID from orchestrator.db import ( + ProcessTable, ProductTable, ResourceTypeTable, SubscriptionInstanceTable, SubscriptionInstanceValueTable, SubscriptionTable, ) -from orchestrator.graphql.schemas.subscription import Subscription from orchestrator.types import SubscriptionLifecycle from gso.products import ProductType @@ -87,7 +87,7 @@ def get_product_id_by_name(product_name: ProductType) -> UUID: return ProductTable.query.filter_by(name=product_name).first().product_id -def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[Subscription]: +def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]: """Retrieve a list of active subscriptions based on a specified field and its value. :param field_name: The name of the field to filter by. @@ -97,7 +97,7 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st :type field_value: Any :return: A list of active Subscription objects that match the criteria. - :rtype: List[Subscription] + :rtype: List[SubscriptionTable] """ return ( SubscriptionTable.query.join(ProductTable) @@ -109,3 +109,21 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st .filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE) .all() ) + + +def count_incomplete_validate_products() -> int: + """Count the number of incomplete validate_products processes. + + Returns + ------- + int + The count of incomplete 'validate_products' processes. + """ + return ProcessTable.query.filter( + ProcessTable.workflow_name == "validate_products", ProcessTable.last_status != "completed" + ).count() + + +def get_insync_subscriptions() -> list[SubscriptionTable]: + """Retrieve all subscriptions that are currently in sync.""" + return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all() diff --git a/gso/settings.py b/gso/settings.py index d7b48048872b8de9cc4109fb3eb969ddc41f8ca4..8ccffc31e74656260538766f4e5c955c6700c16b 100644 --- a/gso/settings.py +++ b/gso/settings.py @@ -22,6 +22,16 @@ class GeneralParams(BaseSettings): proxy uses.""" +class CeleryParams(BaseSettings): + """Parameters for Celery.""" + + broker_url: str + result_backend: str + timezone: str = "Europe/Amsterdam" + enable_utc: bool = True + result_expires: int = 3600 + + class InfoBloxParams(BaseSettings): """Parameters related to InfoBlox.""" @@ -104,6 +114,7 @@ class OSSParams(BaseSettings): IPAM: IPAMParams NETBOX: NetBoxParams PROVISIONING_PROXY: ProvisioningProxyParams + CELERY: CeleryParams def load_oss_params() -> OSSParams: diff --git a/gso/worker.py b/gso/worker.py new file mode 100644 index 0000000000000000000000000000000000000000..cd0234ef92c603f1ce296c7b31f91ccdf98176af --- /dev/null +++ b/gso/worker.py @@ -0,0 +1,27 @@ +from celery import Celery + +from gso import init_worker_app +from gso.settings import load_oss_params + + +class OrchestratorCelery(Celery): + def on_init(self) -> None: + init_worker_app() + + +settings = load_oss_params() + +celery = OrchestratorCelery( + "worker", + broker=settings.CELERY.broker_url, + backend=settings.CELERY.result_backend, + include=[ + "gso.schedules.task_vacuum", + "gso.schedules.validate_products", + "gso.schedules.resume_workflows", + "gso.schedules.validate_subscriptions", + ], +) + +celery.conf.update(result_expires=settings.CELERY.result_expires) +celery.conf.update(redbeat_redis_url=settings.CELERY.broker_url) diff --git a/gso/workflows/tasks/import_iptrunk.py b/gso/workflows/tasks/import_iptrunk.py index 4be9b2c273745da35335b5cce127f9c8a45488a0..04f583539ed364d87568fc04066a44e2a2d90141 100644 --- a/gso/workflows/tasks/import_iptrunk.py +++ b/gso/workflows/tasks/import_iptrunk.py @@ -79,7 +79,6 @@ def update_ipam_stub_for_subscription( ) -> State: subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network - subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network return {"subscription": subscription} diff --git a/requirements.txt b/requirements.txt index 3f00e088e1fb577e9e07409fa61fcf072962b87f..99febf64b612c3711f9ce6ce0c92d35795606f9c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,18 +1,19 @@ -orchestrator-core==1.3.4 +orchestrator-core[celery]==1.3.4 requests==2.31.0 infoblox-client~=0.6.0 pycountry==22.3.5 pynetbox==7.2.0 +celery-redbeat==2.1.1 # Test and linting dependencies -pytest==7.4.2 -faker==19.10.0 -responses==0.23.3 -black==23.9.1 +pytest==7.4.3 +faker==19.13.0 +responses==0.24.0 +black==23.10.1 isort==5.12.0 flake8==6.1.0 -mypy==1.6.0 -ruff==0.0.292 +mypy==1.6.1 +ruff==0.1.4 sphinx==7.2.6 sphinx-rtd-theme==1.3.0 urllib3_mock==0.3.3 \ No newline at end of file diff --git a/entrypoint.sh b/start-app.sh similarity index 100% rename from entrypoint.sh rename to start-app.sh diff --git a/start-sheduler.sh b/start-sheduler.sh new file mode 100755 index 0000000000000000000000000000000000000000..0677372e06743c6ee70743e0297c873e47c500a5 --- /dev/null +++ b/start-sheduler.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +set -o errexit +set -o nounset + +cd /app + +celery -A gso.worker beat -l debug -S redbeat.RedBeatScheduler diff --git a/start-worker.sh b/start-worker.sh new file mode 100755 index 0000000000000000000000000000000000000000..f3dbbac3805274f0064de911e0ce1fca5d601b77 --- /dev/null +++ b/start-worker.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +set -o errexit +set -o nounset + +cd /app + +celery -A gso.worker worker --loglevel=info \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index 85cfa39f97ac6c20aad797ec198643481ee3fef7..58f5664bec8b4131e3c17a197f2448c120ba0381 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -132,6 +132,11 @@ def configuration_data() -> dict: "auth": "Bearer <token>", "api_version": 1123, }, + "CELERY": { + "broker_url": "redis://localhost:6379", + "result_backend": "rpc://localhost:6379/0", + "result_expires": 3600, + }, } @@ -275,7 +280,7 @@ def fastapi_app(database, db_uri): oauth2lib_settings.OAUTH2_ACTIVE = False oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"] app_settings.DATABASE_URI = db_uri - return init_gso_app(settings=app_settings) + return init_gso_app() @pytest.fixture(scope="session") diff --git a/test/schedules/__init__.py b/test/schedules/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/schedules/test_scheduling.py b/test/schedules/test_scheduling.py new file mode 100644 index 0000000000000000000000000000000000000000..531f20566d05eff1f7023393f91e4c701a4896f3 --- /dev/null +++ b/test/schedules/test_scheduling.py @@ -0,0 +1,37 @@ +from unittest.mock import patch + +import pytest + +from gso.schedules.scheduling import scheduler + + +@pytest.fixture +def mock_celery(): + with patch("gso.schedules.scheduling.current_app") as mock_app: + yield mock_app + + +def test_scheduler_updates_beat_schedule(mock_celery): + mock_celery.conf.beat_schedule = {} + + @scheduler(name="A cool task", minute="0", hour="0", day_of_week="*", day_of_month="*", month_of_year="*") + def mock_task(): + return "task result" + + assert "mock_task" in mock_celery.conf.beat_schedule + scheduled = mock_celery.conf.beat_schedule["mock_task"] + assert scheduled["schedule"].minute == {0} + assert scheduled["schedule"].hour == {0} + assert scheduled["task"] == "test.schedules.test_scheduling.mock_task" + assert scheduled["name"] == "A cool task" + + +def test_scheduled_task_still_works(): + """Ensure that the scheduler decorator does not change the behavior of the function it decorates.""" + + @scheduler(name="A cool task", minute="0", hour="0", day_of_week="*", day_of_month="*", month_of_year="*") + def mock_task(): + return "task result" + + result = mock_task() + assert result == "task result" diff --git a/tox.ini b/tox.ini index 2e7c3a9f6468e0360a260f76f2be257d06213219..ebef98847a857c15c9a5953f7d0f1d0a6edc7ee2 100644 --- a/tox.ini +++ b/tox.ini @@ -22,6 +22,7 @@ deps = ruff isort types-requests + celery-stubs -r requirements.txt commands =