Skip to content
Snippets Groups Projects
Verified Commit 6bba6d1d authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

start resolving mypy linting errors

parent 93f7dcbf
No related branches found
No related tags found
1 merge request!36Add linter tools, and resolve all linting errors
...@@ -25,7 +25,7 @@ class HostAddresses(BaseSettings): ...@@ -25,7 +25,7 @@ class HostAddresses(BaseSettings):
v6: ipaddress.IPv6Address v6: ipaddress.IPv6Address
def new_service_networks(service_type="", comment="", extattrs=None) -> ServiceNetworks: def new_service_networks(service_type: str = "", comment: str = "", extattrs=None) -> ServiceNetworks:
if extattrs is None: if extattrs is None:
extattrs = {} extattrs = {}
v4_service_network = _ipam.allocate_service_ipv4_network( v4_service_network = _ipam.allocate_service_ipv4_network(
...@@ -38,8 +38,8 @@ def new_service_networks(service_type="", comment="", extattrs=None) -> ServiceN ...@@ -38,8 +38,8 @@ def new_service_networks(service_type="", comment="", extattrs=None) -> ServiceN
def new_service_host( def new_service_host(
hostname, hostname: str,
service_type="", service_type: str = "",
service_networks: ServiceNetworks = None, service_networks: ServiceNetworks = None,
host_addresses: HostAddresses = None, host_addresses: HostAddresses = None,
cname_aliases=None, cname_aliases=None,
...@@ -58,13 +58,13 @@ def new_service_host( ...@@ -58,13 +58,13 @@ def new_service_host(
def delete_service_network( def delete_service_network(
network: ipaddress.ip_network = None, service_type="" network: ipaddress.ip_network = None, service_type: str = ""
) -> Union[V4ServiceNetwork, V6ServiceNetwork]: ) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
return _ipam.delete_service_network(ipnetwork=network, service_type=service_type) return _ipam.delete_service_network(ipnetwork=network, service_type=service_type)
def delete_service_host( def delete_service_host(
hostname="", host_addresses: HostAddresses = None, cname_aliases=None, service_type="" hostname: str = "", host_addresses: HostAddresses = None, cname_aliases=None, service_type: str = ""
) -> V4HostAddress | V6HostAddress: ) -> V4HostAddress | V6HostAddress:
if cname_aliases is None: if cname_aliases is None:
cname_aliases = [] cname_aliases = []
......
...@@ -3,14 +3,14 @@ import requests ...@@ -3,14 +3,14 @@ import requests
from gso import settings from gso import settings
def import_new_router(router_name, oss_params=settings.OSSParams): def import_new_router(router_name, oss_params=settings.OSSParams) -> None:
r = requests.post( r = requests.post(
f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/initialize-router/{router_name}", timeout=10000 f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/initialize-router/{router_name}", timeout=10000
) )
r.raise_for_status() r.raise_for_status()
def next_lag(router_name, oss_params=settings.OSSParams): def next_lag(router_name, oss_params=settings.OSSParams) -> dict:
r = requests.post( r = requests.post(
f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/next-lag/{router_name}", timeout=10000 f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/next-lag/{router_name}", timeout=10000
) )
...@@ -19,7 +19,7 @@ def next_lag(router_name, oss_params=settings.OSSParams): ...@@ -19,7 +19,7 @@ def next_lag(router_name, oss_params=settings.OSSParams):
return response["name"] return response["name"]
def next_physical(router_name, lag_name, oss_params=settings.OSSParams): def next_physical(router_name, lag_name, oss_params=settings.OSSParams) -> dict:
# TODO: speed needed (if first interface) # TODO: speed needed (if first interface)
r = requests.post( r = requests.post(
f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/next-physical/{router_name}/{lag_name}", f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/next-physical/{router_name}/{lag_name}",
......
import ipaddress import ipaddress
from typing import Type import logging
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label from orchestrator.forms.validators import Label
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import SubscriptionLifecycle, UUIDstr from orchestrator.types import SubscriptionLifecycle, UUIDstr, FormGenerator
from orchestrator.workflow import done, init, step, workflow from orchestrator.workflow import done, init, step, workflow, StepList
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import BaseModel
from gso.products.product_types.device import Device from gso.products.product_types.device import Device
from gso.services import ipam from gso.services import ipam
from gso.services._ipam import V4HostAddress, V6HostAddress
from gso.services.ipam import V4ServiceNetwork, V6ServiceNetwork
logger = logging.getLogger(__name__)
def initial_input_form_generator(subscription_id: UUIDstr) -> Type[BaseModel]:
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
subscription = Device.from_subscription(subscription_id) subscription = Device.from_subscription(subscription_id)
class TerminateForm(FormPage): class TerminateForm(FormPage):
...@@ -23,12 +26,13 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> Type[BaseModel]: ...@@ -23,12 +26,13 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> Type[BaseModel]:
return TerminateForm return TerminateForm
def _deprovision_in_user_management_system(fqdn: str) -> str: def _deprovision_in_user_management_system(fqdn: str) -> None:
logger.debug(fqdn)
pass pass
@step("Deprovision loopback IPs from IPAM/DNS") @step("Deprovision loopback IPs from IPAM/DNS")
def deprovision_loopback_ips(subscription: Device) -> None: def deprovision_loopback_ips(subscription: Device) -> dict[str, V4HostAddress | V6HostAddress]:
input_host_addresses = ipam.HostAddresses( input_host_addresses = ipam.HostAddresses(
v4=ipaddress.ip_address(subscription.device.device_lo_ipv4_address), v4=ipaddress.ip_address(subscription.device.device_lo_ipv4_address),
v6=ipaddress.ip_address(subscription.device.device_lo_ipv6_address), v6=ipaddress.ip_address(subscription.device.device_lo_ipv6_address),
...@@ -46,7 +50,7 @@ def deprovision_loopback_ips(subscription: Device) -> None: ...@@ -46,7 +50,7 @@ def deprovision_loopback_ips(subscription: Device) -> None:
@step("Deprovision SI- interface IPs from IPAM/DNS") @step("Deprovision SI- interface IPs from IPAM/DNS")
def deprovision_si_ips(subscription: Device) -> None: def deprovision_si_ips(subscription: Device) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
service_network = ipam.delete_service_network( service_network = ipam.delete_service_network(
network=ipaddress.ip_network(subscription.device.device_si_ipv4_network), network=ipaddress.ip_network(subscription.device.device_si_ipv4_network),
service_type="SI", service_type="SI",
...@@ -55,7 +59,7 @@ def deprovision_si_ips(subscription: Device) -> None: ...@@ -55,7 +59,7 @@ def deprovision_si_ips(subscription: Device) -> None:
@step("Deprovision LT- interface (IAS) IPs from IPAM/DNS") @step("Deprovision LT- interface (IAS) IPs from IPAM/DNS")
def deprovision_lt_ips(subscription: Device) -> None: def deprovision_lt_ips(subscription: Device) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
service_network_v4 = ipam.delete_service_network( service_network_v4 = ipam.delete_service_network(
network=ipaddress.ip_network(subscription.device.device_ias_lt_ipv4_network), network=ipaddress.ip_network(subscription.device.device_ias_lt_ipv4_network),
service_type="LT_IAS", service_type="LT_IAS",
...@@ -75,7 +79,7 @@ def deprovision_lt_ips(subscription: Device) -> None: ...@@ -75,7 +79,7 @@ def deprovision_lt_ips(subscription: Device) -> None:
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.TERMINATE, target=Target.TERMINATE,
) )
def terminate_device(): def terminate_device() -> StepList:
return ( return (
init init
>> store_process_subscription(Target.TERMINATE) >> store_process_subscription(Target.TERMINATE)
......
...@@ -2,12 +2,11 @@ from uuid import uuid4 ...@@ -2,12 +2,11 @@ from uuid import uuid4
from orchestrator.db.models import ProductTable, SubscriptionTable from orchestrator.db.models import ProductTable, SubscriptionTable
# noinspection PyProtectedMember
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, UniqueConstrainedList from orchestrator.forms.validators import Choice, UniqueConstrainedList
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import done, init, step, workflow from orchestrator.workflow import done, init, step, workflow, StepList
from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form from orchestrator.workflows.utils import wrap_create_initial_input_form
...@@ -50,7 +49,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: ...@@ -50,7 +49,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
class AeMembersListA(UniqueConstrainedList[str]): class AeMembersListA(UniqueConstrainedList[str]):
min_items = initial_user_input.iptrunk_minimum_links min_items = initial_user_input.iptrunk_minimum_links
DeviceEnumA = Choice("Select a device", zip(devices.keys(), devices.items())) DeviceEnumA: str = Choice("Select a device", zip(devices.keys(), devices.items()))
class CreateIptrunkSideAForm(FormPage): class CreateIptrunkSideAForm(FormPage):
class Config: class Config:
...@@ -66,7 +65,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: ...@@ -66,7 +65,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
# We remove the selected device for side A, to prevent any loops # We remove the selected device for side A, to prevent any loops
devices.pop(str(user_input_side_a.iptrunk_sideA_node_id.name)) devices.pop(str(user_input_side_a.iptrunk_sideA_node_id.name))
DeviceEnumB = Choice("Select a device", zip(devices.keys(), devices.items())) DeviceEnumB: str = Choice("Select a device", zip(devices.keys(), devices.items()))
class AeMembersListB(UniqueConstrainedList[str]): class AeMembersListB(UniqueConstrainedList[str]):
min_items = len(user_input_side_a.iptrunk_sideA_ae_members) min_items = len(user_input_side_a.iptrunk_sideA_ae_members)
...@@ -161,7 +160,7 @@ def provision_ip_trunk_iface_dry(subscription: IptrunkProvisioning, process_id: ...@@ -161,7 +160,7 @@ def provision_ip_trunk_iface_dry(subscription: IptrunkProvisioning, process_id:
return { return {
"subscription": subscription, "subscription": subscription,
"label_text": ("Provision of the Trunk interface [DRY] " "Please refresh to get the results of the playbook"), "label_text": "[DRY RUN] Provisioning a trunk interface, please refresh to get the results of the playbook.",
} }
...@@ -171,7 +170,7 @@ def provision_ip_trunk_iface_real(subscription: IptrunkProvisioning, process_id: ...@@ -171,7 +170,7 @@ def provision_ip_trunk_iface_real(subscription: IptrunkProvisioning, process_id:
return { return {
"subscription": subscription, "subscription": subscription,
"label_text": ("Provision of the Trunk interface [REAL] " "Please refresh to get the results of the playbook"), "label_text": "Provisioning a trunk interface, please refresh to get the results of the playbook.",
} }
...@@ -181,7 +180,7 @@ def provision_ip_trunk_isis_iface_dry(subscription: IptrunkProvisioning, process ...@@ -181,7 +180,7 @@ def provision_ip_trunk_isis_iface_dry(subscription: IptrunkProvisioning, process
return { return {
"subscription": subscription, "subscription": subscription,
"label_text": ("Provision of the ISIS interface [DRY]" "Please refresh to get the results of the playbook"), "label_text": "[DRY RUN] Provisioning ISIS interfaces, please refresh to get the results of the playbook.",
} }
...@@ -191,7 +190,7 @@ def provision_ip_trunk_isis_iface_real(subscription: IptrunkProvisioning, proces ...@@ -191,7 +190,7 @@ def provision_ip_trunk_isis_iface_real(subscription: IptrunkProvisioning, proces
return { return {
"subscription": subscription, "subscription": subscription,
"label_text": ("Provision of the ISIS interface [REAL]" "Please refresh to get the results of the playbook"), "label_text": "Provisioning ISIS interfaces, please refresh to get the results of the playbook.",
} }
...@@ -201,7 +200,7 @@ def provision_ip_trunk_ldp_iface_dry(subscription: IptrunkProvisioning, process_ ...@@ -201,7 +200,7 @@ def provision_ip_trunk_ldp_iface_dry(subscription: IptrunkProvisioning, process_
return { return {
"subscription": subscription, "subscription": subscription,
"label_text": ("Provision of the LDP interface [DRY]" "Please refresh to get the results of the playbook"), "label_text": "[DRY RUN] Provisioning LDP interface, please refresh to get the results of the playbook.",
} }
...@@ -211,7 +210,7 @@ def provision_ip_trunk_ldp_iface_real(subscription: IptrunkProvisioning, process ...@@ -211,7 +210,7 @@ def provision_ip_trunk_ldp_iface_real(subscription: IptrunkProvisioning, process
return { return {
"subscription": subscription, "subscription": subscription,
"label_text": ("Provision of the LDP interface [REAL]" "Please refresh to get the results of the playbook"), "label_text": "Provisioning LDP interface, please refresh to get the results of the playbook.",
} }
...@@ -221,7 +220,7 @@ def provision_ip_trunk_lldp_iface_dry(subscription: IptrunkProvisioning, process ...@@ -221,7 +220,7 @@ def provision_ip_trunk_lldp_iface_dry(subscription: IptrunkProvisioning, process
return { return {
"subscription": subscription, "subscription": subscription,
"label_text": ("Provision of the LLDP interface [DRY]" "Please refresh to get the results of the playbook"), "label_text": "[DRY RUN] Provisioning LLDP interface, please refresh to get the results of the playbook.",
} }
...@@ -231,7 +230,7 @@ def provision_ip_trunk_lldp_iface_real(subscription: IptrunkProvisioning, proces ...@@ -231,7 +230,7 @@ def provision_ip_trunk_lldp_iface_real(subscription: IptrunkProvisioning, proces
return { return {
"subscription": subscription, "subscription": subscription,
"label_text": ("Provision of the LLDP interface [REAL]" "Please refresh to get the results of the playbook"), "label_text": "Provisioning LLDP interface, please refresh to get the results of the playbook.",
} }
...@@ -240,7 +239,7 @@ def provision_ip_trunk_lldp_iface_real(subscription: IptrunkProvisioning, proces ...@@ -240,7 +239,7 @@ def provision_ip_trunk_lldp_iface_real(subscription: IptrunkProvisioning, proces
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE, target=Target.CREATE,
) )
def create_iptrunk(): def create_iptrunk() -> StepList:
return ( return (
init init
>> create_subscription >> create_subscription
......
...@@ -3,7 +3,7 @@ from uuid import uuid4 ...@@ -3,7 +3,7 @@ from uuid import uuid4
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import done, init, step, workflow from orchestrator.workflow import done, init, step, workflow, StepList
from orchestrator.workflows.steps import resync, set_status, store_process_subscription from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form from orchestrator.workflows.utils import wrap_create_initial_input_form
...@@ -76,7 +76,7 @@ def initialize_subscription( ...@@ -76,7 +76,7 @@ def initialize_subscription(
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE, target=Target.CREATE,
) )
def create_site(): def create_site() -> StepList:
return ( return (
init init
>> create_subscription >> create_subscription
......
...@@ -23,7 +23,10 @@ exclude = ''' ...@@ -23,7 +23,10 @@ exclude = '''
''' '''
[tool.mypy] [tool.mypy]
exclude = "venv" exclude = [
"venv",
"gso/services/_ipam.py" # TODO: remove
]
ignore_missing_imports = true ignore_missing_imports = true
disallow_untyped_calls = true disallow_untyped_calls = true
disallow_untyped_defs = true disallow_untyped_defs = true
......
...@@ -8,7 +8,7 @@ import pytest ...@@ -8,7 +8,7 @@ import pytest
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def configuration_data(): def configuration_data() -> dict:
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0)) s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
...@@ -64,7 +64,7 @@ def configuration_data(): ...@@ -64,7 +64,7 @@ def configuration_data():
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def data_config_filename(configuration_data): def data_config_filename(configuration_data) -> str:
file_name = os.path.join(tempfile.gettempdir(), os.urandom(24).hex()) file_name = os.path.join(tempfile.gettempdir(), os.urandom(24).hex())
open(file_name, "x").close() open(file_name, "x").close()
with open(file_name, "wb") as f: with open(file_name, "wb") as f:
......
import ipaddress import ipaddress
import re import re
from os import PathLike
import pytest import pytest
import responses import responses
...@@ -8,7 +9,7 @@ from gso.services import ipam ...@@ -8,7 +9,7 @@ from gso.services import ipam
@responses.activate @responses.activate
def test_new_service_networks(data_config_filename): def test_new_service_networks(data_config_filename: PathLike):
responses.add( responses.add(
method=responses.POST, method=responses.POST,
url=re.compile(r".*/wapi.*/network.*"), url=re.compile(r".*/wapi.*/network.*"),
...@@ -39,7 +40,7 @@ def test_new_service_networks(data_config_filename): ...@@ -39,7 +40,7 @@ def test_new_service_networks(data_config_filename):
@responses.activate @responses.activate
def test_new_service_host(data_config_filename): def test_new_service_host(data_config_filename: PathLike):
responses.add( responses.add(
method=responses.POST, method=responses.POST,
url=re.compile(r".*/wapi.*/record:host$"), url=re.compile(r".*/wapi.*/record:host$"),
...@@ -193,7 +194,7 @@ def test_new_service_host(data_config_filename): ...@@ -193,7 +194,7 @@ def test_new_service_host(data_config_filename):
@responses.activate @responses.activate
def test_delete_service_network(data_config_filename): def test_delete_service_network(data_config_filename: PathLike):
responses.add( responses.add(
method=responses.GET, method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"),
...@@ -288,7 +289,7 @@ def test_delete_service_network(data_config_filename): ...@@ -288,7 +289,7 @@ def test_delete_service_network(data_config_filename):
@responses.activate @responses.activate
def test_delete_service_host(data_config_filename): def test_delete_service_host(data_config_filename: PathLike):
responses.add( responses.add(
method=responses.GET, method=responses.GET,
url=re.compile(r".*/wapi.*record:host.*"), url=re.compile(r".*/wapi.*record:host.*"),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment