Skip to content
Snippets Groups Projects
__init__.py 3.03 KiB
"""The main entrypoint for GSO, and the different ways in which it can be run."""

from importlib import metadata
from os import getenv

import sentry_sdk
import typer
from celery import Celery
from orchestrator import OrchestratorCore, app_settings
from orchestrator.cli.main import app as cli_app
from orchestrator.graphql import SCALAR_OVERRIDES
from orchestrator.graphql.resolvers.version import VERSIONS
from orchestrator.services.tasks import initialise_celery

# noinspection PyUnresolvedReferences
import gso.products
import gso.workflows  # noqa: F401
from gso.api import router as api_router
from gso.auth.oidc import oidc_instance
from gso.auth.opa import graphql_opa_instance, opa_instance
from gso.graphql_api.resolvers.customer import custom_subscription_interface
from gso.graphql_api.types import GSO_SCALAR_OVERRIDES
from gso.settings import celery_settings, load_oss_params

SCALAR_OVERRIDES.update(GSO_SCALAR_OVERRIDES)
VERSIONS.extend([
    f"GÉANT Service Orchestrator: {metadata.version("geant-service-orchestrator")}",
    f"GÉANT Service Orchestrator GUI: {getenv("GSO_GUI_VERSION", "Unknown")}",
    f"LSO: {getenv("LSO_VERSION", "Unknown")}",
    f"GAP Ansible collection: {getenv("GAP_ANSIBLE_COLLECTION_VERSION", "Unknown")}",
    f"Moodi Ansible collection: {getenv("MOODI_ANSIBLE_COLLECTION_VERSION", "Unknown")}",
])


def gso_initialise_celery(celery: Celery) -> None:
    """Initialise the Celery app."""
    initialise_celery(celery)
    celery.conf.task_routes = {}


def init_gso_app() -> OrchestratorCore:
    """Initialise the GSO app."""
    app = OrchestratorCore(base_settings=app_settings)
    app.register_authentication(oidc_instance)
    app.register_authorization(opa_instance)
    app.register_graphql_authorization(graphql_opa_instance)
    app.register_graphql(subscription_interface=custom_subscription_interface)
    app.include_router(api_router, prefix="/api")

    celery = Celery(
        "geant-service-orchestrator",
        broker=celery_settings.broker_url,
        backend=celery_settings.result_backend,
        include=["orchestrator.services.tasks", "gso.tasks.start_process"],
    )
    celery.conf.update(
        result_expires=celery_settings.result_expires,
        task_always_eager=app_settings.TESTING,
        task_eager_propagates=app_settings.TESTING,
    )
    gso_initialise_celery(celery)

    return app


def init_cli_app() -> typer.Typer:
    """Initialise GSO as a CLI application."""
    from gso.cli import imports, netbox, schedule  # noqa: PLC0415

    cli_app.add_typer(imports.app, name="import-cli")
    cli_app.add_typer(netbox.app, name="netbox-cli")
    cli_app.add_typer(schedule.app, name="schedule-cli")
    return cli_app()


def init_sentry() -> None:
    """Only initialize Sentry if not in testing mode."""
    if not app_settings.TESTING and (sentry_params := load_oss_params().SENTRY):
        sentry_sdk.init(
            dsn=sentry_params.DSN, environment=load_oss_params().GENERAL.environment, traces_sample_rate=1.0
        )


init_sentry()

__all__ = ["init_cli_app", "init_gso_app", "init_sentry"]