Skip to content
Snippets Groups Projects
Commit 05269a8b authored by Neda Moeini's avatar Neda Moeini
Browse files

Fixed linting errors.

parent 5474620f
No related branches found
No related tags found
1 merge request!60Feature/nat 217 import sites
Pipeline #83852 failed
...@@ -2,12 +2,9 @@ ...@@ -2,12 +2,9 @@
from fastapi.param_functions import Depends from fastapi.param_functions import Depends
from fastapi.routing import APIRouter from fastapi.routing import APIRouter
from orchestrator.security import opa_security_default from orchestrator.security import opa_security_default
from gso.api.api_v1.endpoints import imports from gso.api.api_v1.endpoints import imports
api_router = APIRouter() api_router = APIRouter()
api_router.include_router( api_router.include_router(imports.router, prefix="/imports", dependencies=[Depends(opa_security_default)])
imports.router, prefix="/imports", dependencies=[Depends(opa_security_default)]
)
\ No newline at end of file
import ipaddress import ipaddress
from typing import Dict, Any, Optional from typing import Any, Dict, Optional
from uuid import UUID from uuid import UUID
from fastapi import HTTPException, status from fastapi import HTTPException, status
...@@ -15,9 +15,9 @@ router = APIRouter() ...@@ -15,9 +15,9 @@ router = APIRouter()
def start_process(process_name: str, data: dict) -> UUID: def start_process(process_name: str, data: dict) -> UUID:
"""Utility function to start a process and handle common exceptions.""" """Start a process and handle common exceptions."""
pid = processes.start_process(process_name, [data]) pid: UUID = processes.start_process(process_name, [data])
if pid is None: if pid is None:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.")
...@@ -46,16 +46,19 @@ class SiteImport(BaseModel): ...@@ -46,16 +46,19 @@ class SiteImport(BaseModel):
@router.post("/sites", status_code=status.HTTP_201_CREATED, tags=["Import"]) @router.post("/sites", status_code=status.HTTP_201_CREATED, tags=["Import"])
def import_site(site: SiteImport) -> Dict[str, Any]: def import_site(site: SiteImport) -> Dict[str, Any]:
""" """Import site by running the import_site workflow.
Import site by running the import_site workflow.
Args: Args:
- site: A SiteImport object containing site details. ----
site (SiteImport): The site information to be imported.
Returns: Returns:
- A dictionary containing the detail message and the process id. -------
dict: A dictionary containing the process id of the started process and detail message.
Raises: Raises:
- HTTPException: If there's an error in the process or if the site already exists. ------
HTTPException: If the site already exists or if there's an error in the process.
""" """
try: try:
subscription = subscriptions.retrieve_subscription_by_subscription_instance_value( subscription = subscriptions.retrieve_subscription_by_subscription_instance_value(
...@@ -88,18 +91,21 @@ class RouterImportModel(BaseModel): ...@@ -88,18 +91,21 @@ class RouterImportModel(BaseModel):
@router.post("/routers", status_code=status.HTTP_201_CREATED, tags=["Import"]) @router.post("/routers", status_code=status.HTTP_201_CREATED, tags=["Import"])
def import_router(router_data: RouterImportModel): def import_router(router_data: RouterImportModel) -> Dict[str, Any]:
""" """Import a router by running the import_router workflow.
Import router by running the import_router workflow.
Args: Args:
- router_data: A RouterImportModel object containing router details. ----
router_data (RouterImportModel): The router information to be imported.
Returns: Returns:
- A dictionary containing the detail message and the process id. -------
dict: A dictionary containing the process id of the started process and detail message.
Raises: Raises:
- HTTPException: If there's an error in the process. ------
HTTPException: If there's an error in the process.
""" """
pid = start_process("import_router", router_data.dict()) pid = start_process("import_router", router_data.dict())
return {"detail": f"Router added successfully", "pid": pid} return {"detail": "Router added successfully", "pid": pid}
...@@ -4,7 +4,7 @@ app: typer.Typer = typer.Typer() ...@@ -4,7 +4,7 @@ app: typer.Typer = typer.Typer()
@app.command() @app.command()
def import_sites(): def import_sites() -> None:
"""Import sites from a source.""" """Import sites from a source."""
# TODO: Implement this CLI command to import sites from a source. # TODO: Implement this CLI command to import sites from a source.
typer.echo("Importing sites...") typer.echo("Importing sites...")
from typing import Optional from typing import Any, Dict
class CustomerNotFoundError(Exception): class CustomerNotFoundError(Exception):
"""Exception raised when a customer is not found.""" """Exception raised when a customer is not found."""
pass pass
...@@ -15,7 +16,7 @@ def all_customers() -> list[dict]: ...@@ -15,7 +16,7 @@ def all_customers() -> list[dict]:
] ]
def get_customer_by_name(name: str) -> Optional[dict]: def get_customer_by_name(name: str) -> Dict[str, Any]:
for customer in all_customers(): for customer in all_customers():
if customer["name"] == name: if customer["name"] == name:
return customer return customer
......
import ipaddress import ipaddress
from typing import Optional from typing import Optional
from uuid import UUID
from orchestrator import workflow from orchestrator import workflow
from orchestrator.db import ( from orchestrator.db import (
ProductTable, ProductTable,
SubscriptionTable,
SubscriptionInstanceValueTable,
SubscriptionInstanceTable,
ResourceTypeTable, ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
) )
from orchestrator.domain.base import S
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr, SubscriptionLifecycle from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, init, step, done from orchestrator.workflow import StepList, done, init, step
from orchestrator.workflows.steps import store_process_subscription, set_status, resync from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from gso.products import Site
from gso.products.product_blocks import router as router_pb from gso.products.product_blocks import router as router_pb
from gso.products.product_blocks.router import RouterVendor, RouterRole from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_types import router from gso.products.product_types import router
from gso.products.product_types.router import RouterInactive from gso.products.product_types.router import RouterInactive
from gso.products.product_types.site import Site
from gso.services.crm import get_customer_by_name from gso.services.crm import get_customer_by_name
@step("Create subscription") @step("Create subscription")
def create_subscription(customer: str) -> State: def create_subscription(customer: str) -> State:
customer_id: UUIDstr = get_customer_by_name(customer)["id"] customer_id: UUID = get_customer_by_name(customer)["id"]
product_id: UUIDstr = ProductTable.query.filter_by(name="Router").first().product_id product_id: UUID = ProductTable.query.filter_by(name="Router").first().product_id
subscription = RouterInactive.from_product_id(product_id, customer_id) subscription = RouterInactive.from_product_id(product_id, customer_id)
return { return {
...@@ -61,7 +61,7 @@ def initial_input_form_generator() -> FormGenerator: ...@@ -61,7 +61,7 @@ def initial_input_form_generator() -> FormGenerator:
return user_input.dict() return user_input.dict()
def get_site_by_name(site_name: str) -> S: def get_site_by_name(site_name: str) -> Site:
subscription = ( subscription = (
SubscriptionTable.query.join( SubscriptionTable.query.join(
ProductTable, SubscriptionInstanceTable, SubscriptionInstanceValueTable, ResourceTypeTable ProductTable, SubscriptionInstanceTable, SubscriptionInstanceValueTable, ResourceTypeTable
......
from uuid import uuid4 from uuid import UUID, uuid4
from orchestrator.db.models import ProductTable from orchestrator.db.models import ProductTable
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import State, SubscriptionLifecycle, FormGenerator from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from pydantic.types import UUID
from gso.products.product_blocks.site import SiteTier from gso.products.product_blocks.site import SiteTier
from gso.products.product_types import site from gso.products.product_types import site
...@@ -50,15 +49,14 @@ def generate_initial_input_form() -> FormGenerator: ...@@ -50,15 +49,14 @@ def generate_initial_input_form() -> FormGenerator:
initial_input_form=generate_initial_input_form, initial_input_form=generate_initial_input_form,
) )
def import_site() -> StepList: def import_site() -> StepList:
""" """Workflow to import a site without provisioning it."""
Workflow to import a site without provisioning it.
"""
return ( return (
init init
>> create_subscription >> create_subscription
>> store_process_subscription(Target.CREATE) >> store_process_subscription(Target.CREATE)
>> initialize_subscription >> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE) >> set_status(SubscriptionLifecycle.ACTIVE)
>> resync >> resync
>> done >> done
) )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment