-
Karel van Klink authoredKarel van Klink authored
__init__.py 1.80 KiB
"""The main entrypoint for :term:`GSO`, and the different ways in which it can be run."""
import os
import sentry_sdk
import typer
from orchestrator import OrchestratorCore, app_settings
from orchestrator.cli.main import app as cli_app
from orchestrator.graphql import SCALAR_OVERRIDES
# noinspection PyUnresolvedReferences
import gso.products
import gso.workflows # noqa: F401
from gso.api import router as api_router
from gso.auth.oidc import oidc_instance
from gso.auth.opa import graphql_opa_instance, opa_instance
from gso.graphql_api.types import GSO_SCALAR_OVERRIDES
from gso.settings import load_oss_params
SCALAR_OVERRIDES.update(GSO_SCALAR_OVERRIDES)
def init_gso_app() -> OrchestratorCore:
"""Initialise the :term:`GSO` app."""
app = OrchestratorCore(base_settings=app_settings)
app.register_authentication(oidc_instance)
app.register_authorization(opa_instance)
app.register_graphql_authorization(graphql_opa_instance)
app.register_graphql()
app.include_router(api_router, prefix="/api")
return app
def init_worker_app() -> OrchestratorCore:
"""Initialise a :term:`GSO` instance as Celery worker."""
return OrchestratorCore(base_settings=app_settings)
def init_cli_app() -> typer.Typer:
"""Initialise :term:`GSO` as a CLI application."""
from gso.cli import imports, netbox # noqa: PLC0415
cli_app.add_typer(imports.app, name="import-cli")
cli_app.add_typer(netbox.app, name="netbox-cli")
return cli_app()
def init_sentry() -> None:
"""Only initialize Sentry if not in testing mode."""
if os.getenv("TESTING", "false").lower() == "false" and (sentry_params := load_oss_params().SENTRY):
sentry_sdk.init(
dsn=sentry_params.DSN, environment=load_oss_params().GENERAL.environment, traces_sample_rate=1.0
)
init_sentry()