Skip to content
Snippets Groups Projects
Commit 3ccf99d0 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

Add celery worker and celery beat

parent c530567b
No related branches found
No related tags found
No related merge requests found
Pipeline #84456 failed
This commit is part of merge request !101. Comments created here will be created in the context of that merge request.
Showing
with 1489 additions and 2 deletions
...@@ -15,9 +15,12 @@ RUN pip install \ ...@@ -15,9 +15,12 @@ RUN pip install \
# Set the environment variable for the translations directory # Set the environment variable for the translations directory
ENV TRANSLATIONS_DIR=/app/gso/translations/ ENV TRANSLATIONS_DIR=/app/gso/translations/
COPY --chmod=755 entrypoint.sh /app/entrypoint.sh # Copy the shell scripts and ensure scripts do not have Windows line endings and make them executable
COPY start-app.sh start-worker.sh start-scheduler.sh /app/
RUN sed -i 's/\r$//' start-app.sh start-worker.sh start-scheduler.sh && \
chmod 755 start-app.sh start-worker.sh start-scheduler.sh
RUN chown -R appuser:appgroup /app RUN chown -R appuser:appgroup /app
USER appuser USER appuser
EXPOSE 8080 EXPOSE 8080
ENTRYPOINT ["/app/entrypoint.sh"] ENTRYPOINT ["/app/start-app.sh"]
import typer
from orchestrator import OrchestratorCore
from orchestrator.cli.main import app as cli_app
from orchestrator.settings import AppSettings
import gso.products # noqa: F401
import gso.workflows # noqa: F401
from gso.api import router as api_router
from gso.cli import netbox
base_settings = AppSettings() # TODO check if this is correct
def init_gso_app() -> OrchestratorCore:
app = OrchestratorCore(base_settings=base_settings)
app.include_router(api_router, prefix="/api")
return app
def init_worker_app() -> OrchestratorCore:
return OrchestratorCore(base_settings=base_settings)
def init_cli_app() -> typer.Typer:
from gso.cli import import_sites
cli_app.add_typer(import_sites.app, name="import_sites")
cli_app.add_typer(netbox.app, name="netbox-cli")
return cli_app()
# A generic, single database configuration.
[alembic]
# template used to generate migration files
file_template = %%(year)d-%%(month).2d-%%(day).2d_%%(rev)s_%%(slug)s
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
script_location = migrations
version_locations = %(here)s/migrations/versions
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
from fastapi import APIRouter
from gso.api.v1 import router as router_v1
router = APIRouter()
router.include_router(router_v1, prefix="/v1")
from fastapi import APIRouter
from gso.api.v1.imports import router as imports_router
from gso.api.v1.subscriptions import router as subscriptions_router
router = APIRouter()
router.include_router(imports_router)
router.include_router(subscriptions_router)
import ipaddress
from typing import Any
from uuid import UUID
from fastapi import Depends, HTTPException, status
from fastapi.routing import APIRouter
from orchestrator.security import opa_security_default
from orchestrator.services import processes
from pydantic import BaseModel, root_validator, validator
from pydantic.fields import ModelField
from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_blocks.site import SiteTier
from gso.services import subscriptions
from gso.services.crm import CustomerNotFoundError, get_customer_by_name
from gso.utils.helpers import (
LAGMember,
validate_country_code,
validate_ipv4_or_ipv6,
validate_site_fields_is_unique,
validate_site_name,
)
router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)])
class ImportResponseModel(BaseModel):
pid: UUID
detail: str
class SiteImportModel(BaseModel):
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
customer: str
@validator("site_ts_address", allow_reuse=True)
def validate_ts_address(cls, site_ts_address: str) -> str:
validate_site_fields_is_unique("site_ts_address", site_ts_address)
validate_ipv4_or_ipv6(site_ts_address)
return site_ts_address
@validator("site_country_code", allow_reuse=True)
def country_code_must_exist(cls, country_code: str) -> str:
validate_country_code(country_code)
return country_code
@validator("site_internal_id", "site_bgp_community_id", allow_reuse=True)
def validate_unique_fields(cls, value: str, field: ModelField) -> str | int:
return validate_site_fields_is_unique(field.name, value)
@validator("site_name", allow_reuse=True)
def site_name_must_be_valid(cls, site_name: str) -> str:
"""Validate the site name.
The site name must consist of three uppercase letters (A-Z) followed
by an optional single digit (0-9).
"""
validate_site_fields_is_unique("site_name", site_name)
validate_site_name(site_name)
return site_name
class RouterImportModel(BaseModel):
customer: str
router_site: str
hostname: str
ts_port: int
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: bool
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router_si_ipv4_network: ipaddress.IPv4Network | None = None
router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None
router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None
class IptrunkImportModel(BaseModel):
customer: str
geant_s_sid: str
iptrunk_type: IptrunkType
iptrunk_description: str
iptrunk_speed: PhyPortCapacity
iptrunk_minimum_links: int
side_a_node_id: str
side_a_ae_iface: str
side_a_ae_geant_a_sid: str
side_a_ae_members: list[LAGMember]
side_b_node_id: str
side_b_ae_iface: str
side_b_ae_geant_a_sid: str
side_b_ae_members: list[LAGMember]
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
@classmethod
def _get_active_routers(cls) -> set[str]:
return {
str(router["subscription_id"])
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id"])
}
@validator("customer")
def check_if_customer_exists(cls, value: str) -> str:
try:
get_customer_by_name(value)
except CustomerNotFoundError:
raise ValueError(f"Customer {value} not found")
return value
@validator("side_a_node_id", "side_b_node_id")
def check_if_router_side_is_available(cls, value: str) -> str:
if value not in cls._get_active_routers():
raise ValueError(f"Router {value} not found")
return value
@validator("side_a_ae_members", "side_b_ae_members")
def check_side_uniqueness(cls, value: list[str]) -> list[str]:
if len(value) != len(set(value)):
raise ValueError("Items must be unique")
return value
@root_validator
def check_members(cls, values: dict[str, Any]) -> dict[str, Any]:
min_links = values["iptrunk_minimum_links"]
side_a_members = values.get("side_a_ae_members", [])
side_b_members = values.get("side_b_ae_members", [])
len_a = len(side_a_members)
len_b = len(side_b_members)
if len_a < min_links:
raise ValueError(f"Side A members should be at least {min_links} (iptrunk_minimum_links)")
if len_a != len_b:
raise ValueError("Mismatch between Side A and B members")
return values
def _start_process(process_name: str, data: dict) -> UUID:
"""Start a process and handle common exceptions."""
pid: UUID = processes.start_process(process_name, [data])
if pid is None:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.")
process = processes._get_process(pid)
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
)
return pid
@router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_site(site: SiteImportModel) -> dict[str, Any]:
"""Import a site by running the import_site workflow.
:param site: The site information to be imported.
:type site: SiteImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If the site already exists or if there's an error in the process.
"""
pid = _start_process("import_site", site.dict())
return {"detail": "Site added successfully.", "pid": pid}
@router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_router(router_data: RouterImportModel) -> dict[str, Any]:
"""Import a router by running the import_router workflow.
:param router_data: The router information to be imported.
:type router_data: RouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("import_router", router_data.dict())
return {"detail": "Router added successfully", "pid": pid}
@router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]:
"""Import an iptrunk by running the import_iptrunk workflow.
:param iptrunk_data: The iptrunk information to be imported.
:type iptrunk_data: IptrunkImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("import_iptrunk", iptrunk_data.dict())
return {"detail": "Iptrunk added successfully", "pid": pid}
from typing import Any
from fastapi import Depends, status
from fastapi.routing import APIRouter
from orchestrator.domain import SubscriptionModel
from orchestrator.schemas import SubscriptionDomainModelSchema
from orchestrator.security import opa_security_default
from orchestrator.services.subscriptions import build_extended_domain_model
from gso.services.subscriptions import get_active_router_subscriptions
router = APIRouter(prefix="/subscriptions", tags=["Subscriptions"], dependencies=[Depends(opa_security_default)])
@router.get("/routers", status_code=status.HTTP_200_OK, response_model=list[SubscriptionDomainModelSchema])
def subscription_routers() -> list[dict[str, Any]]:
"""Retrieve all active routers subscriptions."""
subscriptions = []
for r in get_active_router_subscriptions():
subscription = SubscriptionModel.from_subscription(r["subscription_id"])
extended_model = build_extended_domain_model(subscription)
subscriptions.append(extended_model)
return subscriptions
import typer
app: typer.Typer = typer.Typer()
@app.command()
def import_sites() -> None:
"""Import sites from a source."""
# TODO: Implement this CLI command to import sites from a source.
typer.echo("Importing sites...")
import typer
from pynetbox import RequestError
from gso.services.netbox_client import NetboxClient
from gso.utils.device_info import DEFAULT_SITE, ROUTER_ROLE
app: typer.Typer = typer.Typer()
@app.command()
def netbox_initial_setup() -> None:
"""Set up NetBox for the first time.
It includes:
- Creating a default site (GÉANT)
- Creating device roles (Router)
"""
typer.echo("Initial setup of NetBox ...")
typer.echo("Connecting to NetBox ...")
nbclient = NetboxClient()
typer.echo("Creating GÉANT site ...")
try:
nbclient.create_device_site(DEFAULT_SITE["name"], DEFAULT_SITE["slug"])
typer.echo("Site created successfully.")
except RequestError as e:
typer.echo(f"Error creating site: {e}")
typer.echo("Creating Router device role ...")
try:
nbclient.create_device_role(ROUTER_ROLE["name"], ROUTER_ROLE["slug"])
typer.echo("Device role created successfully.")
except RequestError as e:
typer.echo(f"Error creating device role: {e}")
typer.echo("NetBox initial setup completed successfully.")
"""The main module that runs :term:`GSO`."""
from gso import init_cli_app, init_gso_app
app = init_gso_app()
if __name__ == "__main__":
init_cli_app()
import logging
from alembic import context
from orchestrator.db.database import BaseModel
from orchestrator.settings import app_settings
from sqlalchemy import engine_from_config, pool, text
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
logger = logging.getLogger("alembic.env")
config.set_main_option("sqlalchemy.url", app_settings.DATABASE_URI)
target_metadata = BaseModel.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives): # type: ignore[no-untyped-def]
if getattr(config.cmd_opts, "autogenerate", False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info("No changes in schema detected.")
config_section = config.get_section(config.config_ini_section)
if config_section is None:
raise ValueError("Config section not found!")
engine = engine_from_config(
config_section, prefix="sqlalchemy.", poolclass=pool.NullPool
)
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
compare_type=True,
)
try:
with context.begin_transaction():
connection.execute(text("SELECT pg_advisory_xact_lock(1000);"))
context.run_migrations()
finally:
connection.execute(text("SELECT pg_advisory_unlock(1000);"))
connection.close()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
from orchestrator.migrations.helpers import *
# Write your own helper functions below this line.
"""${message}.
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
import sqlalchemy as sa
from alembic import op
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}
"""Add Router workflows.
Revision ID: 3657611f0dfc
Revises: 91047dd30b40
Create Date: 2023-08-14 15:44:25.616608
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '3657611f0dfc'
down_revision = '91047dd30b40'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "create_router",
"target": "CREATE",
"description": "Create router",
"product_type": "Router"
},
{
"name": "terminate_router",
"target": "TERMINATE",
"description": "Terminate router",
"product_type": "Router"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
"""Add Site workflows.
Revision ID: 91047dd30b40
Revises: 97436160a422
Create Date: 2023-08-14 15:42:35.450032
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '91047dd30b40'
down_revision = '97436160a422'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "create_site",
"target": "CREATE",
"description": "Create Site",
"product_type": "Site"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
This diff is collapsed.
"""Add IP Trunk workflows.
Revision ID: a6eefd32c4f7
Revises: 3657611f0dfc
Create Date: 2023-08-14 15:50:03.376997
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'a6eefd32c4f7'
down_revision = '3657611f0dfc'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "create_iptrunk",
"target": "CREATE",
"description": "Create IP trunk",
"product_type": "Iptrunk"
},
{
"name": "terminate_iptrunk",
"target": "TERMINATE",
"description": "Terminate IPtrunk",
"product_type": "Iptrunk"
},
{
"name": "modify_trunk_interface",
"target": "MODIFY",
"description": "Modify IP Trunk interface",
"product_type": "Iptrunk"
},
{
"name": "modify_isis_metric",
"target": "MODIFY",
"description": "Modify IP trunk",
"product_type": "Iptrunk"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
"""Add IP Trunk migration workflow.
Revision ID: e68720f2ec32
Revises: a6eefd32c4f7
Create Date: 2023-08-16 14:48:00.227803
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'e68720f2ec32'
down_revision = 'a6eefd32c4f7'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "migrate_iptrunk",
"target": "MODIFY",
"description": "Migrate an IP Trunk",
"product_type": "Iptrunk"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment