Skip to content
Snippets Groups Projects
Verified Commit a972136d authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

fix linting errors in services

parent c5063364
No related tags found
1 merge request!111Feature/ruff everything party hat emoji
......@@ -40,7 +40,6 @@ class LAGMemberList(UniqueConstrainedList[T_co]):
"""A list of :term:`LAG` member interfaces."""
class IptrunkInterfaceBlockInactive(
ProductBlockModel,
lifecycle=[SubscriptionLifecycle.INITIAL],
......
"""A module that returns the customers available in :term:`GSO`.
For the time being, it's hardcoded to only contain GÉANT as a customer, since this is needed for PHASE 1 deployment.
"""
from typing import Any
from pydantic_forms.validators import Choice
......@@ -8,6 +13,7 @@ class CustomerNotFoundError(Exception):
def all_customers() -> list[dict]:
"""Hardcoded list of customers available in :term:`GSO`."""
return [
{
"id": "8f0df561-ce9d-4d9c-89a8-7953d3ffc961",
......@@ -17,6 +23,7 @@ def all_customers() -> list[dict]:
def get_customer_by_name(name: str) -> dict[str, Any]:
"""Try to get a customer by their name."""
for customer in all_customers():
if customer["name"] == name:
return customer
......@@ -26,6 +33,7 @@ def get_customer_by_name(name: str) -> dict[str, Any]:
def customer_selector() -> Choice:
"""GUI input field for selecting a customer."""
customers = {}
for customer in all_customers():
customers[customer["id"]] = customer["name"]
......
"""The Infoblox service that allows :term:`GSO` to allocate :term:`IPAM` resources that are used in products."""
import ipaddress
from logging import getLogger
......@@ -13,11 +15,11 @@ logger = getLogger(__name__)
class AllocationError(Exception):
pass
"""Raised when Infoblox failed to allocate a resource."""
class DeletionError(Exception):
pass
"""Raised when Infoblox failed to delete a resource."""
def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
......@@ -67,7 +69,8 @@ def _allocate_network(
created_net = objects.Network.create(conn, network=str(network), dns_view=dns_view, comment=comment)
if created_net.response != "Infoblox Object already Exists":
return ipaddress.ip_network(created_net.network)
logger.warning(f"IP container {container} appears to be full.")
msg = f"IP container {container} appears to be full."
logger.warning(msg)
msg = f"Cannot allocate anything in {containers}, check whether any IP space is available."
raise AllocationError(msg)
......@@ -203,7 +206,8 @@ def allocate_host(
)
created_v6 = ipaddress.IPv6Address(new_host.ipv6addr)
except InfobloxCannotCreateObject:
logger.warning(f"Cannot find 1 available IP address in network {ipv6_range}.")
msg = f"Cannot find 1 available IP address in network {ipv6_range}."
logger.warning(msg)
if created_v6 is None:
msg = f"Cannot find 1 available IP address in networks {allocation_networks_v6}."
......@@ -220,7 +224,8 @@ def allocate_host(
new_host = objects.HostRecord.search(conn, name=hostname)
created_v4 = ipaddress.IPv4Address(new_host.ipv4addr)
except InfobloxCannotUpdateObject:
logger.warning(f"Cannot find 1 available IP address in network {ipv4_range}.")
msg = f"Cannot find 1 available IP address in network {ipv4_range}."
logger.warning(msg)
if created_v4 is None:
msg = f"Cannot find 1 available IP address in networks {allocation_networks_v4}."
......@@ -238,7 +243,7 @@ def find_host_by_ip(
:type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address
"""
conn, _ = _setup_connection()
if ip_addr.version == 4:
if ip_addr.version == 4: # noqa: PLR2004, the 4 in IPv4 is well-known and not a "magic value".
return objects.HostRecord.search(
conn,
ipv4addr=ip_addr,
......
......@@ -53,10 +53,12 @@ class NetboxClient:
"""Implement all methods to communicate with the Netbox :term:`API`."""
def __init__(self) -> None:
"""Instantiate a new Netbox client."""
self.netbox_params = load_oss_params().NETBOX
self.netbox = pynetbox.api(self.netbox_params.api, self.netbox_params.token)
def get_all_devices(self) -> list[Devices]:
"""Get all devices in Netbox."""
return list(self.netbox.dcim.devices.all())
def get_allocated_interfaces_by_gso_subscription(self, device_name: str, subscription_id: UUID) -> list[Interfaces]:
......@@ -96,9 +98,10 @@ class NetboxClient:
def create_interface(
self,
iface_name: str,
type: str,
interface_type: str,
device_name: str,
description: str | None = None,
*,
enabled: bool = False,
) -> Interfaces:
"""Create new interface on a device, where device is defined by name.
......@@ -111,7 +114,7 @@ class NetboxClient:
return self.netbox.dcim.interfaces.create(
name=iface_name,
type=type,
type=interface_type,
enabled=enabled,
mark_connected=False,
device=device.id,
......@@ -131,15 +134,18 @@ class NetboxClient:
return self.netbox.dcim.device_types.create(dict(device_type))
def create_device_role(self, name: str, slug: str) -> DeviceRole:
device_role = DeviceRole(**{"name": name, "slug": slug})
"""Create a new device role."""
device_role = DeviceRole(name=name, slug=slug)
return self.netbox.dcim.device_roles.create(dict(device_role))
def create_device_site(self, name: str, slug: str) -> Site:
device_site = Site(**{"name": name, "slug": slug})
"""Create a new site for devices."""
device_site = Site(name=name, slug=slug)
return self.netbox.dcim.sites.create(dict(device_site))
def create_device_manufacturer(self, name: str, slug: str) -> Manufacturer:
device_manufacturer = Manufacturer(**{"name": name, "slug": slug})
"""Create a new device manufacturer."""
device_manufacturer = Manufacturer(name=name, slug=slug)
return self.netbox.dcim.manufacturers.create(dict(device_manufacturer))
@staticmethod
......
......@@ -5,6 +5,7 @@
import json
import logging
from functools import partial
from http import HTTPStatus
import requests
from orchestrator import step
......@@ -55,7 +56,8 @@ def _send_request(operation: CUDOperation, endpoint: str, parameters: dict, call
# Build up a callback URL of the Provisioning Proxy to return its results to.
callback_url = f"{oss.GENERAL.public_hostname}{callback_route}"
logger.debug(f"[provisioning proxy] Callback URL set to {callback_url}")
debug_msg = f"[provisioning proxy] Callback URL set to {callback_url}"
logger.debug(debug_msg)
parameters.update({"callback": callback_url})
url = f"{pp_params.scheme}://{pp_params.api_base}/api/{endpoint}"
......@@ -70,7 +72,7 @@ def _send_request(operation: CUDOperation, endpoint: str, parameters: dict, call
elif operation == CUDOperation.DELETE:
request = requests.delete(url, json=parameters, timeout=10000)
if request.status_code != 200:
if request.status_code != HTTPStatus.OK:
logger.debug(request.content)
raise AssertionError(request.content)
......@@ -85,6 +87,7 @@ def provision_router(
process_id: UUIDstr,
callback_route: str,
tt_number: str,
*,
dry_run: bool = True,
) -> None:
"""Provision a new router using :term:`LSO`.
......@@ -117,6 +120,7 @@ def provision_ip_trunk(
callback_route: str,
tt_number: str,
config_object: str,
*,
dry_run: bool = True,
removed_ae_members: list[str] | None = None,
) -> None:
......@@ -186,6 +190,7 @@ def deprovision_ip_trunk(
process_id: UUIDstr,
callback_route: str,
tt_number: str,
*,
dry_run: bool = True,
) -> None:
"""Deprovision an IP trunk service using :term:`LSO`.
......@@ -224,6 +229,7 @@ def migrate_ip_trunk(
tt_number: str,
verb: str,
config_object: str,
*,
dry_run: bool = True,
) -> None:
"""Migrate an IP trunk service using :term:`LSO`.
......
"""A collection of methods that make interaction with coreDB more straight-forward.
This prevents someone from having to re-write database statements multiple times, that might turn out to be erroneous
or inconsistent when not careful.
"""
from typing import Any
from uuid import UUID
......
......@@ -32,7 +32,9 @@ class LAGMember(BaseModel):
def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
subscription.iptrunk.iptrunk_isis_metric = 90000
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface", False)
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
)
return {
"subscription": subscription,
......
......@@ -235,7 +235,9 @@ def provision_ip_trunk_iface_dry(
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "trunk_interface", True)
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "trunk_interface", dry_run=True,
)
return {"subscription": subscription}
......@@ -247,7 +249,9 @@ def provision_ip_trunk_iface_real(
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "trunk_interface", False)
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "trunk_interface", dry_run=False,
)
return {"subscription": subscription}
......@@ -283,7 +287,9 @@ def provision_ip_trunk_isis_iface_real(
process_id: UUIDstr,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface", False)
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
)
return {"subscription": subscription}
......@@ -309,7 +315,7 @@ def reserve_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State:
# Create LAG interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=trunk_side.iptrunk_side_ae_iface,
type="lag",
interface_type="lag",
device_name=trunk_side.iptrunk_side_node.router_fqdn,
description=str(subscription.subscription_id),
enabled=True,
......
......@@ -205,7 +205,7 @@ def disable_old_config_real(
tt_number,
"deactivate",
"deactivate",
False,
dry_run=False,
)
return {
......@@ -266,7 +266,7 @@ def deploy_new_config_real(
tt_number,
"deploy",
"trunk_interface",
False,
dry_run=False,
)
logger.warning("Playbook verb is not yet properly set.")
......@@ -314,7 +314,7 @@ def deploy_new_isis(
tt_number,
"deploy",
"isis_interface",
False,
dry_run=False,
)
logger.warning("Playbook verb is not yet properly set.")
......@@ -346,7 +346,9 @@ def restore_isis_metric(
old_isis_metric: int,
) -> State:
subscription.iptrunk.iptrunk_isis_metric = old_isis_metric
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface", False)
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
)
return {"subscription": subscription}
......@@ -402,7 +404,7 @@ def delete_old_config_real(
tt_number,
"delete",
"delete",
False,
dry_run=False,
)
logger.warning("Playbook verb is not yet properly set.")
......@@ -456,7 +458,7 @@ def reserve_interfaces_in_netbox(
# Create LAG interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=new_lag_interface,
type="lag",
interface_type="lag",
device_name=new_side.router_fqdn,
description=str(subscription.subscription_id),
enabled=True,
......
......@@ -48,7 +48,9 @@ def provision_ip_trunk_isis_iface_real(
callback_route: str,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface", False)
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
)
return {"subscription": subscription}
......
......@@ -42,21 +42,23 @@ def drain_traffic_from_ip_trunk(
callback_route: str,
tt_number: str,
) -> State:
provisioning_proxy.provision_ip_trunk(subscription, process_id, callback_route, tt_number, "isis_interface", False)
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
)
return {"subscription": subscription}
@step("Deprovision IP trunk [DRY RUN]")
def deprovision_ip_trunk_dry(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
provisioning_proxy.deprovision_ip_trunk(subscription, process_id, callback_route, tt_number, True)
provisioning_proxy.deprovision_ip_trunk(subscription, process_id, callback_route, tt_number, dry_run=True)
return {"subscription": subscription}
@step("Deprovision IP trunk [FOR REAL]")
def deprovision_ip_trunk_real(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
provisioning_proxy.deprovision_ip_trunk(subscription, process_id, callback_route, tt_number, False)
provisioning_proxy.deprovision_ip_trunk(subscription, process_id, callback_route, tt_number, dry_run=False)
return {"subscription": subscription}
......
......@@ -158,7 +158,7 @@ def provision_router_real(
callback_route: str,
tt_number: str,
) -> State:
provisioning_proxy.provision_router(subscription, process_id, callback_route, tt_number, False)
provisioning_proxy.provision_router(subscription, process_id, callback_route, tt_number, dry_run=False)
return {"subscription": subscription}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment