Skip to content
Snippets Groups Projects
Verified Commit f9022eb1 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Merge remote-tracking branch 'origin/feature/update-input-form-validation'...

Merge remote-tracking branch 'origin/feature/update-input-form-validation' into feature/update-input-form-validation
parents 9818590e 9c3c6f8d
No related branches found
No related tags found
No related merge requests found
Showing
with 626 additions and 5 deletions
......@@ -13,9 +13,16 @@ run-tox-pipeline:
- docker-executor
image: python:3.10
services:
- postgres:15.4
# Change pip's cache directory to be inside the project directory since we can
# only cache local items.
variables:
POSTGRES_DB: gso-test-db
POSTGRES_USER: nwa
POSTGRES_PASSWORD: nwa
DATABASE_URI_TEST: 'postgresql://nwa:nwa@postgres:5432/gso-test-db'
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
# Pip's cache doesn't store the python packages
......
from typer import Typer
def load_gso_cli(app: Typer) -> None:
from gso.cli import import_sites
app.add_typer(import_sites.app, name="import_sites")
"""Module that implements process related API endpoints."""
from fastapi.param_functions import Depends
from fastapi.routing import APIRouter
from orchestrator.security import opa_security_default
from gso.api.api_v1.endpoints import imports
api_router = APIRouter()
api_router.include_router(imports.router, prefix="/imports", dependencies=[Depends(opa_security_default)])
import ipaddress
from typing import Any, Dict, Optional
from uuid import UUID
from fastapi import HTTPException, status
from fastapi.routing import APIRouter
from orchestrator.services import processes, subscriptions
from pydantic import BaseModel
from sqlalchemy.exc import MultipleResultsFound
from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_blocks.site import SiteTier
router = APIRouter()
def start_process(process_name: str, data: dict) -> UUID:
"""Start a process and handle common exceptions."""
pid: UUID = processes.start_process(process_name, [data])
if pid is None:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.")
process = processes._get_process(pid)
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
)
return pid
class SiteImport(BaseModel):
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
customer: str
@router.post("/sites", status_code=status.HTTP_201_CREATED, tags=["Import"])
def import_site(site: SiteImport) -> Dict[str, Any]:
"""Import site by running the import_site workflow.
Args:
----
site (SiteImport): The site information to be imported.
Returns:
-------
dict: A dictionary containing the process id of the started process and detail message.
Raises:
------
HTTPException: If the site already exists or if there's an error in the process.
"""
try:
subscription = subscriptions.retrieve_subscription_by_subscription_instance_value(
resource_type="site_name", value=site.site_name, sub_status=("provisioning", "active")
)
if subscription:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Site already exists.")
except MultipleResultsFound:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Multiple subscriptions found.")
pid = start_process("import_site", site.dict())
return {"detail": "Site added successfully.", "pid": pid}
class RouterImportModel(BaseModel):
customer: str
router_site: str
hostname: str
ts_port: int
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: bool
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None
@router.post("/routers", status_code=status.HTTP_201_CREATED, tags=["Import"])
def import_router(router_data: RouterImportModel) -> Dict[str, Any]:
"""Import a router by running the import_router workflow.
Args:
----
router_data (RouterImportModel): The router information to be imported.
Returns:
-------
dict: A dictionary containing the process id of the started process and detail message.
Raises:
------
HTTPException: If there's an error in the process.
"""
pid = start_process("import_router", router_data.dict())
return {"detail": "Router added successfully", "pid": pid}
import typer
app: typer.Typer = typer.Typer()
@app.command()
def import_sites() -> None:
"""Import sites from a source."""
# TODO: Implement this CLI command to import sites from a source.
typer.echo("Importing sites...")
"""The main module that runs {term}`GSO`."""
import typer
from orchestrator import OrchestratorCore
from orchestrator.cli.main import app as core_cli
from orchestrator.settings import AppSettings
import gso.products # noqa: F401
import gso.workflows # noqa: F401
from gso import load_gso_cli
from gso.api.api_v1.api import api_router
app = OrchestratorCore(base_settings=AppSettings())
def init_gso_app(settings: AppSettings) -> OrchestratorCore:
app = OrchestratorCore(base_settings=settings)
app.include_router(api_router, prefix="/api")
return app
def init_cli_app() -> typer.Typer:
load_gso_cli(core_cli)
return core_cli()
app = init_gso_app(settings=AppSettings())
if __name__ == "__main__":
core_cli()
init_cli_app()
from typing import Optional
from typing import Any, Dict
class CustomerNotFoundError(Exception):
"""Exception raised when a customer is not found."""
pass
def all_customers() -> list[dict]:
......@@ -10,9 +16,9 @@ def all_customers() -> list[dict]:
]
def get_customer_by_name(name: str) -> Optional[dict]:
def get_customer_by_name(name: str) -> Dict[str, Any]:
for customer in all_customers():
if customer["name"] == name:
return customer
return None
raise CustomerNotFoundError(f"Customer {name} not found")
from orchestrator.db import (
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
)
from gso.products.product_types.site import Site
def get_site_by_name(site_name: str) -> Site:
"""Get a site by its name.
Args:
----
site_name (str): The name of the site.
"""
subscription = (
SubscriptionTable.query.join(
ProductTable, SubscriptionInstanceTable, SubscriptionInstanceValueTable, ResourceTypeTable
)
.filter(SubscriptionInstanceValueTable.value == site_name)
.filter(ResourceTypeTable.resource_type == "site_name")
.filter(SubscriptionTable.status == "active")
.first()
)
if not subscription:
raise ValueError(f"Site with name {site_name} not found.")
return Site.from_subscription(subscription.subscription_id)
......@@ -8,3 +8,5 @@ LazyWorkflowInstance("gso.workflows.iptrunk.modify_trunk_interface", "modify_tru
LazyWorkflowInstance("gso.workflows.iptrunk.terminate_iptrunk", "terminate_iptrunk")
LazyWorkflowInstance("gso.workflows.iptrunk.modify_isis_metric", "modify_isis_metric")
LazyWorkflowInstance("gso.workflows.site.create_site", "create_site")
LazyWorkflowInstance("gso.workflows.tasks.import_site", "import_site")
LazyWorkflowInstance("gso.workflows.tasks.import_router", "import_router")
import ipaddress
from typing import Optional
from uuid import UUID
from orchestrator import workflow
from orchestrator.db import ProductTable
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, done, init, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from gso.products.product_blocks import router as router_pb
from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_types import router
from gso.products.product_types.router import RouterInactive
from gso.services.crm import get_customer_by_name
from gso.services.subscriptions import get_site_by_name
@step("Create subscription")
def create_subscription(customer: str) -> State:
customer_id: UUID = get_customer_by_name(customer)["id"]
product_id: UUID = ProductTable.query.filter_by(name="Router").first().product_id
subscription = RouterInactive.from_product_id(product_id, customer_id)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
def initial_input_form_generator() -> FormGenerator:
class ImportRouter(FormPage):
class Config:
title = "Import Router"
customer: str
router_site: str
hostname: str
ts_port: int
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: bool
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None
router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None
user_input = yield ImportRouter
return user_input.dict()
@step("Initialize subscription")
def initialize_subscription(
subscription: RouterInactive,
hostname: str,
ts_port: int,
router_vendor: router_pb.RouterVendor,
router_site: str,
router_role: router_pb.RouterRole,
is_ias_connected: Optional[bool] = None,
router_lo_ipv4_address: Optional[ipaddress.IPv4Address] = None,
router_lo_ipv6_address: Optional[ipaddress.IPv6Address] = None,
router_lo_iso_address: Optional[str] = None,
router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None,
router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None,
router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None,
) -> State:
subscription.router.router_ts_port = ts_port
subscription.router.router_vendor = router_vendor
subscription.router.router_site = get_site_by_name(router_site).site
fqdn = (
f"{hostname}.{subscription.router.router_site.site_name.lower()}."
f"{subscription.router.router_site.site_country_code.lower()}"
".geant.net"
)
subscription.router.router_fqdn = fqdn
subscription.router.router_role = router_role
subscription.router.router_access_via_ts = True
subscription.description = f"Router {fqdn}"
subscription.router.router_is_ias_connected = is_ias_connected
subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
subscription.router.router_lo_iso_address = router_lo_iso_address
subscription.router.router_si_ipv4_network = router_si_ipv4_network
subscription.router.router_ias_lt_ipv4_network = router_ias_lt_ipv4_network
subscription.router.router_ias_lt_ipv6_network = router_ias_lt_ipv6_network
subscription = router.RouterProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@workflow(
"Import router",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def import_router() -> StepList:
return (
init
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
from uuid import UUID
from orchestrator.db.models import ProductTable
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from gso.products.product_blocks.site import SiteTier
from gso.products.product_types import site
from gso.services.crm import get_customer_by_name
from gso.workflows.site.create_site import initialize_subscription
@step("Create subscription")
def create_subscription(customer: str) -> State:
customer_id: UUID = get_customer_by_name(customer)["id"]
product_id: UUID = ProductTable.query.filter_by(product_type="Site").first().product_id
subscription = site.SiteInactive.from_product_id(product_id, customer_id)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
def generate_initial_input_form() -> FormGenerator:
class ImportSite(FormPage):
class Config:
title = "Import Site"
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
customer: str
user_input = yield ImportSite
return user_input.dict()
@workflow(
"Import Site",
target=Target.SYSTEM,
initial_input_form=generate_initial_input_form,
)
def import_site() -> StepList:
"""Workflow to import a site without provisioning it."""
return (
init
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
......@@ -4,6 +4,7 @@ requests
pycountry
pytest
faker
responses
black
isort
......@@ -12,3 +13,5 @@ mypy
ruff
sphinx
sphinx-rtd-theme
requests
typer
\ No newline at end of file
......@@ -3,8 +3,21 @@ import json
import os
import socket
import tempfile
from pathlib import Path
import orchestrator
import pytest
from alembic import command
from alembic.config import Config
from orchestrator import app_settings
from orchestrator.db import Database, db
from orchestrator.db.database import ENGINE_ARGUMENTS, SESSION_ARGUMENTS, BaseModel
from sqlalchemy import create_engine
from sqlalchemy.engine import make_url
from sqlalchemy.orm import scoped_session, sessionmaker
from starlette.testclient import TestClient
from gso.main import init_gso_app
@pytest.fixture(scope="session")
......@@ -74,3 +87,128 @@ def data_config_filename(configuration_data) -> str:
os.environ["OSS_PARAMS_FILENAME"] = f.name
yield f.name
@pytest.fixture(scope="session")
def db_uri():
"""Provide the database uri configuration to run the migration on."""
return os.environ.get("DATABASE_URI_TEST", "postgresql://nwa:nwa@localhost/gso-test-db")
def run_migrations(db_uri: str) -> None:
"""Configure the alembic migration and run the migration on the database.
Args:
----
db_uri: The database uri configuration to run the migration on.
Returns:
-------
None
"""
path = Path(__file__).resolve().parent
app_settings.DATABASE_URI = db_uri
alembic_cfg = Config(file_=path / "../gso/alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", db_uri)
alembic_cfg.set_main_option("script_location", str(path / "../gso/migrations"))
version_locations = alembic_cfg.get_main_option("version_locations")
alembic_cfg.set_main_option(
"version_locations", f"{version_locations} {os.path.dirname(orchestrator.__file__)}/migrations/versions/schema"
)
command.upgrade(alembic_cfg, "heads")
@pytest.fixture(scope="session")
def database(db_uri):
"""Create database and run migrations and cleanup after wards.
Args:
----
db_uri: The database uri configuration to run the migration on.
"""
db.update(Database(db_uri))
url = make_url(db_uri)
db_to_create = url.database
url = url.set(database="postgres")
engine = create_engine(url)
with engine.connect() as conn:
conn.execute("COMMIT;")
conn.execute(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{db_to_create}';") # noqa
conn.execute(f'DROP DATABASE IF EXISTS "{db_to_create}";') # noqa
conn.execute("COMMIT;")
conn.execute(f'CREATE DATABASE "{db_to_create}";') # noqa
run_migrations(db_uri)
db.wrapped_database.engine = create_engine(db_uri, **ENGINE_ARGUMENTS)
try:
yield
finally:
db.wrapped_database.engine.dispose()
with engine.connect() as conn:
conn.execute("COMMIT;")
# Terminate all connections to the database
conn.execute(
f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{db_to_create}';" # noqa
)
conn.execute(f'DROP DATABASE IF EXISTS "{db_to_create}";') # noqa
@pytest.fixture(autouse=True)
def db_session(database):
"""Ensure that tests are executed within a transactional scope that automatically rolls back after completion.
This fixture facilitates a pattern known as 'transactional tests'. At the start, it establishes a connection and
begins an overarching transaction. Any database operations performed within the test function—whether they commit
or not happen within the context of this master transaction.
From the perspective of the test function, it seems as though changes are getting committed to the database,
enabling the tests to query and assert the persistence of data. Yet, once the test completes, this fixture
intervenes to roll back the master transaction. This ensures a clean slate after each test, preventing tests from
polluting the database state for subsequent tests.
Benefits:
- Each test runs in isolation with a pristine database state.
- Avoids the overhead of recreating the database schema or re-seeding data between tests.
Args:
----
database: A fixture reference that initializes the database.
"""
with contextlib.closing(db.wrapped_database.engine.connect()) as test_connection:
# Create a new session factory for this context.
session_factory = sessionmaker(bind=test_connection, **SESSION_ARGUMENTS)
scoped_session_instance = scoped_session(session_factory, scopefunc=db.wrapped_database._scopefunc)
# Point the database session to this new scoped session.
db.wrapped_database.session_factory = session_factory
db.wrapped_database.scoped_session = scoped_session_instance
# Set the query for the base model.
BaseModel.set_query(scoped_session_instance.query_property())
transaction = test_connection.begin()
try:
yield
finally:
transaction.rollback()
scoped_session_instance.remove()
@pytest.fixture(scope="session", autouse=True)
def fastapi_app(database, db_uri):
"""Load the GSO FastAPI app for testing purposes."""
app_settings.DATABASE_URI = db_uri
return init_gso_app(settings=app_settings)
@pytest.fixture(scope="session")
def test_client(fastapi_app):
return TestClient(fastapi_app)
import pytest
from faker import Faker
from orchestrator.db import SubscriptionTable
from orchestrator.services import subscriptions
from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_blocks.site import SiteTier
class TestImportEndpoints:
@pytest.fixture(autouse=True)
def setup(self, test_client):
self.faker = Faker()
self.client = test_client
self.site_import_endpoint = "/api/imports/sites"
self.router_import_endpoint = "/api/imports/routers"
self.site_data = {
"site_name": self.faker.name(),
"site_city": self.faker.city(),
"site_country": self.faker.country(),
"site_country_code": self.faker.country_code(),
"site_latitude": float(self.faker.latitude()),
"site_longitude": float(self.faker.longitude()),
"site_bgp_community_id": self.faker.pyint(),
"site_internal_id": self.faker.pyint(),
"site_tier": SiteTier.TIER1,
"site_ts_address": self.faker.ipv4(),
"customer": "Geant",
}
self.router_data = {
"hostname": "127.0.0.1",
"router_role": RouterRole.PE,
"router_vendor": RouterVendor.JUNIPER,
"router_site": self.site_data["site_name"],
"ts_port": 1234,
"customer": "Geant",
"is_ias_connected": True,
"router_lo_ipv4_address": self.faker.ipv4(),
"router_lo_ipv6_address": self.faker.ipv6(),
"router_lo_iso_address": "TestAddress",
}
def test_import_site_endpoint(self):
assert SubscriptionTable.query.all() == []
# Post data to the endpoint
response = self.client.post(self.site_import_endpoint, json=self.site_data)
assert response.status_code == 201
assert "detail" in response.json()
assert "pid" in response.json()
subscription = subscriptions.retrieve_subscription_by_subscription_instance_value(
resource_type="site_name", value=self.site_data["site_name"]
)
assert subscription is not None
self.site_data.pop("customer")
def test_import_site_endpoint_with_existing_site(self):
response = self.client.post(self.site_import_endpoint, json=self.site_data)
assert SubscriptionTable.query.count() == 1
assert response.status_code == 201
response = self.client.post(self.site_import_endpoint, json=self.site_data)
assert response.status_code == 409
assert SubscriptionTable.query.count() == 1
def test_import_site_endpoint_with_invalid_data(self):
# invalid data, missing site_latitude and invalid site_longitude
self.site_data.pop("site_latitude")
self.site_data["site_longitude"] = "invalid"
assert SubscriptionTable.query.count() == 0
response = self.client.post(self.site_import_endpoint, json=self.site_data)
assert response.status_code == 422
assert SubscriptionTable.query.count() == 0
response = response.json()
assert response["detail"][0]["loc"] == ["body", "site_latitude"]
assert response["detail"][0]["msg"] == "field required"
assert response["detail"][1]["loc"] == ["body", "site_longitude"]
assert response["detail"][1]["msg"] == "value is not a valid float"
def test_import_router_endpoint(self):
# Create a site first
response = self.client.post(self.site_import_endpoint, json=self.site_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 1
response = self.client.post(self.router_import_endpoint, json=self.router_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 2
def test_import_router_endpoint_with_invalid_data(self):
response = self.client.post(self.site_import_endpoint, json=self.site_data)
assert response.status_code == 201
assert SubscriptionTable.query.count() == 1
# invalid data, missing hostname and invalid router_lo_ipv6_address
self.router_data.pop("hostname")
self.router_data["router_lo_ipv6_address"] = "invalid"
response = self.client.post(self.router_import_endpoint, json=self.router_data)
assert response.status_code == 422
assert SubscriptionTable.query.count() == 1
response = response.json()
assert response["detail"][0]["loc"] == ["body", "hostname"]
assert response["detail"][0]["msg"] == "field required"
assert response["detail"][1]["loc"] == ["body", "router_lo_ipv6_address"]
assert response["detail"][1]["msg"] == "value is not a valid IPv6 address"
......@@ -8,6 +8,7 @@ max-line-length = 120
ban-relative-imports = true
[testenv]
passenv = DATABASE_URI_TEST
deps =
coverage
flake8
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment