Skip to content
Snippets Groups Projects
Verified Commit f94f214e authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

fix linting errors in utils module

parent a972136d
Branches
Tags
1 merge request!111Feature/ruff everything party hat emoji
...@@ -8,6 +8,7 @@ import ipaddress ...@@ -8,6 +8,7 @@ import ipaddress
import json import json
import logging import logging
import os import os
from pathlib import Path
from pydantic import BaseSettings, NonNegativeInt from pydantic import BaseSettings, NonNegativeInt
...@@ -43,10 +44,14 @@ class InfoBloxParams(BaseSettings): ...@@ -43,10 +44,14 @@ class InfoBloxParams(BaseSettings):
class V4Netmask(NonNegativeInt): class V4Netmask(NonNegativeInt):
"""A valid netmask for an IPv4 network or address."""
le = 32 le = 32
class V6Netmask(NonNegativeInt): class V6Netmask(NonNegativeInt):
"""A valid netmask for an IPv6 network or address."""
le = 128 le = 128
...@@ -118,8 +123,8 @@ class OSSParams(BaseSettings): ...@@ -118,8 +123,8 @@ class OSSParams(BaseSettings):
def load_oss_params() -> OSSParams: def load_oss_params() -> OSSParams:
"""Look for OSS_PARAMS_FILENAME in the environment and load the parameters from that file.""" """Look for ``OSS_PARAMS_FILENAME`` in the environment and load the parameters from that file."""
with open(os.environ["OSS_PARAMS_FILENAME"], encoding="utf-8") as file: with Path(os.environ["OSS_PARAMS_FILENAME"]).open(encoding="utf-8") as file:
return OSSParams(**json.loads(file.read())) return OSSParams(**json.loads(file.read()))
......
"""Utility module that contains helper methods, exceptions, etc."""
"""Utility module that defines facts about different tiers of sites. Used by Netbox when creating a new device."""
from pydantic import BaseModel from pydantic import BaseModel
class ModuleInfo(BaseModel): class ModuleInfo(BaseModel):
"""A collection of facts that define the tier of a site."""
device_type: str device_type: str
module_bays_slots: list[int] module_bays_slots: list[int]
module_type: str module_type: str
...@@ -10,7 +14,10 @@ class ModuleInfo(BaseModel): ...@@ -10,7 +14,10 @@ class ModuleInfo(BaseModel):
class TierInfo: class TierInfo:
"""Information for different tiers of sites."""
def __init__(self) -> None: def __init__(self) -> None:
"""Initialise the different tiers of sites that exist."""
self.Tier1 = ModuleInfo( self.Tier1 = ModuleInfo(
device_type="7750 SR-7s", device_type="7750 SR-7s",
module_bays_slots=[1, 2], module_bays_slots=[1, 2],
...@@ -27,6 +34,7 @@ class TierInfo: ...@@ -27,6 +34,7 @@ class TierInfo:
) )
def get_module_by_name(self, name: str) -> ModuleInfo: def get_module_by_name(self, name: str) -> ModuleInfo:
"""Retrieve a module by name."""
return getattr(self, name) return getattr(self, name)
......
"""Custom exceptions for :term:`GSO`."""
class NotFoundError(Exception): class NotFoundError(Exception):
"""Exception raised for not found search.""" """Exception raised for not found search."""
......
"""Helper methods that are used across :term:`GSO`."""
import ipaddress import ipaddress
import re import re
from ipaddress import IPv4Address from ipaddress import IPv4Address
...@@ -19,17 +21,21 @@ from gso.services.subscriptions import get_active_subscriptions_by_field_and_val ...@@ -19,17 +21,21 @@ from gso.services.subscriptions import get_active_subscriptions_by_field_and_val
class LAGMember(BaseModel): class LAGMember(BaseModel):
"""A :term:`LAG` member interface that consists of a name and description."""
# TODO: validate interface name # TODO: validate interface name
interface_name: str interface_name: str
interface_description: str interface_description: str
def __hash__(self) -> int: def __hash__(self) -> int:
"""Calculate the hash based on the interface name and description, so that uniqueness can be determined."""
# TODO: check if this is still needed # TODO: check if this is still needed
return hash((self.interface_name, self.interface_description)) return hash((self.interface_name, self.interface_description))
@step("[COMMIT] Set ISIS metric to 90.000") @step("[COMMIT] Set ISIS metric to 90.000")
def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
"""Workflow step for setting the :term:`ISIS` metric to 90k as an arbitrarily high value to drain a link."""
old_isis_metric = subscription.iptrunk.iptrunk_isis_metric old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
subscription.iptrunk.iptrunk_isis_metric = 90000 subscription.iptrunk.iptrunk_isis_metric = 90000
provisioning_proxy.provision_ip_trunk( provisioning_proxy.provision_ip_trunk(
...@@ -175,20 +181,22 @@ def validate_ipv4_or_ipv6(value: str) -> str: ...@@ -175,20 +181,22 @@ def validate_ipv4_or_ipv6(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 address.""" """Validate that a value is a valid IPv4 or IPv6 address."""
try: try:
ipaddress.ip_address(value) ipaddress.ip_address(value)
return value
except ValueError as e: except ValueError as e:
msg = "Enter a valid IPv4 or IPv6 address." msg = "Enter a valid IPv4 or IPv6 address."
raise ValueError(msg) from e raise ValueError(msg) from e
else:
return value
def validate_country_code(country_code: str) -> str: def validate_country_code(country_code: str) -> str:
"""Validate that a country code is valid.""" """Validate that a country code is valid."""
try: try:
pycountry.countries.lookup(country_code) pycountry.countries.lookup(country_code)
return country_code
except LookupError as e: except LookupError as e:
msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format." msg = "Invalid or non-existent country code, it must be in ISO 3166-1 alpha-2 format."
raise ValueError(msg) from e raise ValueError(msg) from e
else:
return country_code
def validate_site_name(site_name: str) -> str: def validate_site_name(site_name: str) -> str:
...@@ -198,8 +206,9 @@ def validate_site_name(site_name: str) -> str: ...@@ -198,8 +206,9 @@ def validate_site_name(site_name: str) -> str:
""" """
pattern = re.compile(r"^[A-Z]{3}[0-9]?$") pattern = re.compile(r"^[A-Z]{3}[0-9]?$")
if not pattern.match(site_name): if not pattern.match(site_name):
msg = "Enter a valid site name. It must consist of three uppercase letters (A-Z) followed by an optional single digit (0-9)." msg = (
raise ValueError( "Enter a valid site name. It must consist of three uppercase letters (A-Z) followed by an optional single "
msg, "digit (0-9)."
) )
raise ValueError(msg)
return site_name return site_name
"""Module that sets up :term:`GSO` as a Celery worker. This will allow for the scheduling of regular task workflows."""
from celery import Celery from celery import Celery
from gso import init_worker_app from gso import init_worker_app
...@@ -5,7 +7,10 @@ from gso.settings import load_oss_params ...@@ -5,7 +7,10 @@ from gso.settings import load_oss_params
class OrchestratorCelery(Celery): class OrchestratorCelery(Celery):
def on_init(self) -> None: """A :term:`GSO` instance that functions as a Celery worker."""
def on_init(self) -> None: # noqa: PLR6301
"""Initialise a new Celery worker."""
init_worker_app() init_worker_app()
......
...@@ -107,7 +107,7 @@ def initialize_subscription( ...@@ -107,7 +107,7 @@ def initialize_subscription(
@step("Allocate loopback interfaces in IPAM") @step("Allocate loopback interfaces in IPAM")
def ipam_allocate_loopback(subscription: RouterProvisioning, is_ias_connected: bool) -> State: def ipam_allocate_loopback(subscription: RouterProvisioning, is_ias_connected: bool) -> State: # noqa: FBT001
fqdn = subscription.router.router_fqdn fqdn = subscription.router.router_fqdn
loopback_v4, loopback_v6 = infoblox.allocate_host(f"lo0.{fqdn}", "LO", [fqdn], str(subscription.subscription_id)) loopback_v4, loopback_v6 = infoblox.allocate_host(f"lo0.{fqdn}", "LO", [fqdn], str(subscription.subscription_id))
......
...@@ -24,9 +24,8 @@ def test_latitude(input_value, is_valid): ...@@ -24,9 +24,8 @@ def test_latitude(input_value, is_valid):
if is_valid: if is_valid:
assert LatitudeCoordinate.validate(input_value) == input_value assert LatitudeCoordinate.validate(input_value) == input_value
else: else:
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError, match="Invalid latitude coordinate"):
LatitudeCoordinate.validate(input_value) LatitudeCoordinate.validate(input_value)
assert "Invalid latitude coordinate" in str(excinfo.value)
@pytest.mark.parametrize( @pytest.mark.parametrize(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment