Skip to content
Snippets Groups Projects
Commit 0e52e706 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

Added celery worker

Added celery beat
Added workflow, task validation schedulling
parent c530567b
No related branches found
No related tags found
1 merge request!101Added celery worker and celery beat
Pipeline #84458 failed
This commit is part of merge request !101. Comments created here will be created in the context of that merge request.
Showing
with 313 additions and 39 deletions
...@@ -9,6 +9,7 @@ oss-params.json ...@@ -9,6 +9,7 @@ oss-params.json
.pytest_cache .pytest_cache
.ruff_cache .ruff_cache
.tox .tox
build/
# Documentation # Documentation
docs/build docs/build
......
...@@ -15,9 +15,12 @@ RUN pip install \ ...@@ -15,9 +15,12 @@ RUN pip install \
# Set the environment variable for the translations directory # Set the environment variable for the translations directory
ENV TRANSLATIONS_DIR=/app/gso/translations/ ENV TRANSLATIONS_DIR=/app/gso/translations/
COPY --chmod=755 entrypoint.sh /app/entrypoint.sh # Copy the shell scripts and ensure scripts do not have Windows line endings and make them executable
COPY start-app.sh start-worker.sh start-scheduler.sh /app/
RUN sed -i 's/\r$//' start-app.sh start-worker.sh start-scheduler.sh && \
chmod 755 start-app.sh start-worker.sh start-scheduler.sh
RUN chown -R appuser:appgroup /app RUN chown -R appuser:appgroup /app
USER appuser USER appuser
EXPOSE 8080 EXPOSE 8080
ENTRYPOINT ["/app/entrypoint.sh"] ENTRYPOINT ["/app/start-app.sh"]
from typer import Typer import typer
from orchestrator import OrchestratorCore
from orchestrator.cli.main import app as cli_app
from orchestrator.settings import AppSettings
import gso.products # noqa: F401
import gso.workflows # noqa: F401
from gso.api import router as api_router
from gso.cli import netbox from gso.cli import netbox
base_settings = AppSettings() # TODO check if this is correct
def load_gso_cli(app: Typer) -> None:
def init_gso_app() -> OrchestratorCore:
app = OrchestratorCore(base_settings=base_settings)
app.include_router(api_router, prefix="/api")
return app
def init_worker_app() -> OrchestratorCore:
return OrchestratorCore(base_settings=base_settings)
def init_cli_app() -> typer.Typer:
from gso.cli import import_sites from gso.cli import import_sites
app.add_typer(import_sites.app, name="import_sites") cli_app.add_typer(import_sites.app, name="import_sites")
app.add_typer(netbox.app, name="netbox-cli") cli_app.add_typer(netbox.app, name="netbox-cli")
return cli_app()
"""The main module that runs :term:`GSO`.""" """The main module that runs :term:`GSO`."""
import typer
from orchestrator import OrchestratorCore
from orchestrator.cli.main import app as core_cli
from orchestrator.settings import AppSettings
import gso.products # noqa: F401 from gso import init_cli_app, init_gso_app
import gso.workflows # noqa: F401
from gso import load_gso_cli
from gso.api import router as api_router
app = init_gso_app()
def init_gso_app(settings: AppSettings) -> OrchestratorCore:
app = OrchestratorCore(base_settings=settings)
app.include_router(api_router, prefix="/api")
return app
def init_cli_app() -> typer.Typer:
load_gso_cli(core_cli)
return core_cli()
app = init_gso_app(settings=AppSettings())
if __name__ == "__main__": if __name__ == "__main__":
init_cli_app() init_cli_app()
...@@ -49,5 +49,10 @@ ...@@ -49,5 +49,10 @@
"scheme": "https", "scheme": "https",
"api_base": "localhost:44444", "api_base": "localhost:44444",
"api_version": 1123 "api_version": 1123
},
"CELERY": {
"broker_url": "redis://localhost:6379/0",
"result_backend": "rpc://localhost:6379/0",
"result_expires": 3600
} }
} }
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.worker import celery
@celery.task
@scheduler(name="Resume workflows", hour="*/1")
def run_resume_workflows() -> None:
"""Resume all workflows that are stuck on tasks with the status 'waiting'."""
start_process("task_resume_workflows")
import inspect
from functools import wraps
from typing import Any, Callable
from celery import current_app
from celery.schedules import crontab
def scheduler(
name: str,
minute: str = "*",
hour: str = "*",
day_of_week: str = "*",
day_of_month: str = "*",
month_of_year: str = "*",
) -> Callable[[Callable], Callable]:
"""Crontab schedule.
A Crontab can be used as the ``run_every`` value of a
periodic task entry to add :manpage:`crontab(5)`-like scheduling.
Like a :manpage:`cron(5)`-job, you can specify units of time of when
you'd like the task to execute. It's a reasonably complete
implementation of :command:`cron`'s features, so it should provide a fair
degree of scheduling needs.
You can specify a minute, an hour, a day of the week, a day of the
month, and/or a month in the year in any of the following formats:
.. attribute:: minute
- A (list of) integers from 0-59 that represent the minutes of
an hour of when execution should occur; or
- A string representing a Crontab pattern. This may get pretty
advanced, like ``minute='*/15'`` (for every quarter) or
``minute='1,13,30-45,50-59/2'``.
.. attribute:: hour
- A (list of) integers from 0-23 that represent the hours of
a day of when execution should occur; or
- A string representing a Crontab pattern. This may get pretty
advanced, like ``hour='*/3'`` (for every three hours) or
``hour='0,8-17/2'`` (at midnight, and every two hours during
office hours).
.. attribute:: day_of_week
- A (list of) integers from 0-6, where Sunday = 0 and Saturday =
6, that represent the days of a week that execution should
occur.
- A string representing a Crontab pattern. This may get pretty
advanced, like ``day_of_week='mon-fri'`` (for weekdays only).
(Beware that ``day_of_week='*/2'`` does not literally mean
'every two days', but 'every day that is divisible by two'!)
.. attribute:: day_of_month
- A (list of) integers from 1-31 that represents the days of the
month that execution should occur.
- A string representing a Crontab pattern. This may get pretty
advanced, such as ``day_of_month='2-30/2'`` (for every even
numbered day) or ``day_of_month='1-7,15-21'`` (for the first and
third weeks of the month).
.. attribute:: month_of_year
- A (list of) integers from 1-12 that represents the months of
the year during which execution can occur.
- A string representing a Crontab pattern. This may get pretty
advanced, such as ``month_of_year='*/3'`` (for the first month
of every quarter) or ``month_of_year='2-12/2'`` (for every even
numbered month).
.. attribute:: nowfun
Function returning the current date and time
(:class:`~datetime.datetime`).
.. attribute:: app
The Celery app instance.
It's important to realize that any day on which execution should
occur must be represented by entries in all three of the day and
month attributes. For example, if ``day_of_week`` is 0 and
``day_of_month`` is every seventh day, only months that begin
on Sunday and are also in the ``month_of_year`` attribute will have
execution events. Or, ``day_of_week`` is 1 and ``day_of_month``
is '1-7,15-21' means every first and third Monday of every month
present in ``month_of_year``.
"""
def decorator(task_func: Callable) -> Callable:
@wraps(task_func)
def scheduled_task(*args: Any, **kwargs: Any) -> Any:
return task_func(*args, **kwargs)
module = inspect.getmodule(task_func)
if module is None:
raise ValueError(f"Module for the task function {task_func.__name__} could not be found.")
task_path = f"{module.__name__}.{task_func.__name__}"
current_app.conf.beat_schedule[task_func.__name__] = {
"name": name,
"task": task_path,
"schedule": crontab(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week,
),
}
return scheduled_task
return decorator
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.worker import celery
@celery.task
@scheduler(name="Clean up tasks", hour="*/6")
def vacuum_tasks() -> None:
start_process("task_clean_up_tasks")
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.services.subscriptions import count_incomplete_validate_products
from gso.worker import celery
@celery.task
@scheduler(name="Validate Products and inactive subscriptions", minute="30", hour="2")
def validate_products() -> None:
if count_incomplete_validate_products() > 0:
start_process("task_validate_products")
import structlog
from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP
from orchestrator.targets import Target
from gso.schedules.scheduling import scheduler
from gso.services.subscriptions import get_insync_subscriptions
from gso.worker import celery
logger = structlog.get_logger(__name__)
@celery.task
@scheduler(name="Subscriptions Validator", minute="10", hour="0")
def validate_subscriptions() -> None:
for subscription in get_insync_subscriptions():
validation_workflow = None
for workflow in subscription.product.workflows:
if workflow.target == Target.SYSTEM:
validation_workflow = workflow.name
if validation_workflow:
default = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]
usable_when = WF_USABLE_MAP.get(validation_workflow, default)
if subscription.status in usable_when:
json = [{"subscription_id": str(subscription.subscription_id)}]
validate_func = get_execution_context()["validate"]
validate_func(validation_workflow, json=json)
else:
logger.warning(
"SubscriptionTable has no validation workflow",
subscription=subscription,
product=subscription.product.name,
)
...@@ -2,13 +2,13 @@ from typing import Any ...@@ -2,13 +2,13 @@ from typing import Any
from uuid import UUID from uuid import UUID
from orchestrator.db import ( from orchestrator.db import (
ProcessTable,
ProductTable, ProductTable,
ResourceTypeTable, ResourceTypeTable,
SubscriptionInstanceTable, SubscriptionInstanceTable,
SubscriptionInstanceValueTable, SubscriptionInstanceValueTable,
SubscriptionTable, SubscriptionTable,
) )
from orchestrator.graphql.schemas.subscription import Subscription
from orchestrator.types import SubscriptionLifecycle from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductType from gso.products import ProductType
...@@ -87,7 +87,7 @@ def get_product_id_by_name(product_name: ProductType) -> UUID: ...@@ -87,7 +87,7 @@ def get_product_id_by_name(product_name: ProductType) -> UUID:
return ProductTable.query.filter_by(name=product_name).first().product_id return ProductTable.query.filter_by(name=product_name).first().product_id
def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[Subscription]: def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]:
"""Retrieve a list of active subscriptions based on a specified field and its value. """Retrieve a list of active subscriptions based on a specified field and its value.
:param field_name: The name of the field to filter by. :param field_name: The name of the field to filter by.
...@@ -97,7 +97,7 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st ...@@ -97,7 +97,7 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st
:type field_value: Any :type field_value: Any
:return: A list of active Subscription objects that match the criteria. :return: A list of active Subscription objects that match the criteria.
:rtype: List[Subscription] :rtype: List[SubscriptionTable]
""" """
return ( return (
SubscriptionTable.query.join(ProductTable) SubscriptionTable.query.join(ProductTable)
...@@ -109,3 +109,21 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st ...@@ -109,3 +109,21 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st
.filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE) .filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)
.all() .all()
) )
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_products processes.
Returns
-------
int
The count of incomplete 'validate_products' processes.
"""
return ProcessTable.query.filter(
ProcessTable.workflow_name == "validate_products", ProcessTable.last_status != "completed"
).count()
def get_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently in sync."""
return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()
...@@ -22,6 +22,16 @@ class GeneralParams(BaseSettings): ...@@ -22,6 +22,16 @@ class GeneralParams(BaseSettings):
proxy uses.""" proxy uses."""
class CeleryParams(BaseSettings):
"""Parameters for Celery."""
broker_url: str
result_backend: str
timezone: str = "Europe/Amsterdam"
enable_utc: bool = True
result_expires: int = 3600
class InfoBloxParams(BaseSettings): class InfoBloxParams(BaseSettings):
"""Parameters related to InfoBlox.""" """Parameters related to InfoBlox."""
...@@ -104,6 +114,7 @@ class OSSParams(BaseSettings): ...@@ -104,6 +114,7 @@ class OSSParams(BaseSettings):
IPAM: IPAMParams IPAM: IPAMParams
NETBOX: NetBoxParams NETBOX: NetBoxParams
PROVISIONING_PROXY: ProvisioningProxyParams PROVISIONING_PROXY: ProvisioningProxyParams
CELERY: CeleryParams
def load_oss_params() -> OSSParams: def load_oss_params() -> OSSParams:
......
gso/worker.py 0 → 100644
from celery import Celery
from gso import init_worker_app
from gso.settings import load_oss_params
class OrchestratorCelery(Celery):
def on_init(self) -> None:
init_worker_app()
settings = load_oss_params()
celery = OrchestratorCelery(
"worker",
broker=settings.CELERY.broker_url,
backend=settings.CELERY.result_backend,
include=[
"gso.schedules.task_vacuum",
"gso.schedules.validate_products",
"gso.schedules.resume_workflows",
"gso.schedules.validate_subscriptions",
],
)
celery.conf.update(result_expires=settings.CELERY.result_expires)
celery.conf.update(redbeat_redis_url=settings.CELERY.broker_url)
...@@ -79,7 +79,6 @@ def update_ipam_stub_for_subscription( ...@@ -79,7 +79,6 @@ def update_ipam_stub_for_subscription(
) -> State: ) -> State:
subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network
subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
return {"subscription": subscription} return {"subscription": subscription}
......
orchestrator-core==1.3.4 orchestrator-core[celery]==1.3.4
requests==2.31.0 requests==2.31.0
infoblox-client~=0.6.0 infoblox-client~=0.6.0
pycountry==22.3.5 pycountry==22.3.5
pynetbox==7.2.0 pynetbox==7.2.0
celery-redbeat==2.1.1
# Test and linting dependencies # Test and linting dependencies
pytest==7.4.2 pytest==7.4.3
faker==19.10.0 faker==19.13.0
responses==0.23.3 responses==0.24.0
black==23.9.1 black==23.10.1
isort==5.12.0 isort==5.12.0
flake8==6.1.0 flake8==6.1.0
mypy==1.6.0 mypy==1.6.1
ruff==0.0.292 ruff==0.1.4
sphinx==7.2.6 sphinx==7.2.6
sphinx-rtd-theme==1.3.0 sphinx-rtd-theme==1.3.0
urllib3_mock==0.3.3 urllib3_mock==0.3.3
\ No newline at end of file
File moved
#!/bin/sh
set -o errexit
set -o nounset
cd /app
celery -A gso.worker beat -l debug -S redbeat.RedBeatScheduler
#!/bin/sh
set -o errexit
set -o nounset
cd /app
celery -A gso.worker worker --loglevel=info
\ No newline at end of file
...@@ -132,6 +132,11 @@ def configuration_data() -> dict: ...@@ -132,6 +132,11 @@ def configuration_data() -> dict:
"auth": "Bearer <token>", "auth": "Bearer <token>",
"api_version": 1123, "api_version": 1123,
}, },
"CELERY": {
"broker_url": "redis://localhost:6379",
"result_backend": "rpc://localhost:6379/0",
"result_expires": 3600,
},
} }
...@@ -275,7 +280,7 @@ def fastapi_app(database, db_uri): ...@@ -275,7 +280,7 @@ def fastapi_app(database, db_uri):
oauth2lib_settings.OAUTH2_ACTIVE = False oauth2lib_settings.OAUTH2_ACTIVE = False
oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"] oauth2lib_settings.ENVIRONMENT_IGNORE_MUTATION_DISABLED = ["local", "TESTING"]
app_settings.DATABASE_URI = db_uri app_settings.DATABASE_URI = db_uri
return init_gso_app(settings=app_settings) return init_gso_app()
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment