Skip to content
Snippets Groups Projects
Verified Commit 93f7dcbf authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

resolve ruff linting errors

parent e6504717
No related branches found
No related tags found
1 merge request!36Add linter tools, and resolve all linting errors
...@@ -4,6 +4,7 @@ from typing import Union ...@@ -4,6 +4,7 @@ from typing import Union
from pydantic import BaseSettings from pydantic import BaseSettings
from gso.services import _ipam from gso.services import _ipam
from gso.services._ipam import V4HostAddress, V6HostAddress
class V4ServiceNetwork(BaseSettings): class V4ServiceNetwork(BaseSettings):
...@@ -19,20 +20,14 @@ class ServiceNetworks(BaseSettings): ...@@ -19,20 +20,14 @@ class ServiceNetworks(BaseSettings):
v6: ipaddress.IPv6Network v6: ipaddress.IPv6Network
class V4HostAddress(BaseSettings):
v4: ipaddress.IPv4Address
class V6HostAddress(BaseSettings):
v6: ipaddress.IPv6Address
class HostAddresses(BaseSettings): class HostAddresses(BaseSettings):
v4: ipaddress.IPv4Address v4: ipaddress.IPv4Address
v6: ipaddress.IPv6Address v6: ipaddress.IPv6Address
def new_service_networks(service_type="", comment="", extattrs={}) -> ServiceNetworks: def new_service_networks(service_type="", comment="", extattrs=None) -> ServiceNetworks:
if extattrs is None:
extattrs = {}
v4_service_network = _ipam.allocate_service_ipv4_network( v4_service_network = _ipam.allocate_service_ipv4_network(
service_type=service_type, comment=comment, extattrs=extattrs service_type=service_type, comment=comment, extattrs=extattrs
) )
...@@ -48,8 +43,10 @@ def new_service_host( ...@@ -48,8 +43,10 @@ def new_service_host(
service_networks: ServiceNetworks = None, service_networks: ServiceNetworks = None,
host_addresses: HostAddresses = None, host_addresses: HostAddresses = None,
cname_aliases=None, cname_aliases=None,
extattrs={}, extattrs=None,
) -> HostAddresses: ) -> HostAddresses:
if extattrs is None:
extattrs = {}
return _ipam.allocate_service_host( return _ipam.allocate_service_host(
hostname=hostname, hostname=hostname,
service_type=service_type, service_type=service_type,
...@@ -67,8 +64,10 @@ def delete_service_network( ...@@ -67,8 +64,10 @@ def delete_service_network(
def delete_service_host( def delete_service_host(
hostname="", host_addresses: HostAddresses = None, cname_aliases=[], service_type="" hostname="", host_addresses: HostAddresses = None, cname_aliases=None, service_type=""
) -> HostAddresses: ) -> V4HostAddress | V6HostAddress:
if cname_aliases is None:
cname_aliases = []
return _ipam.delete_service_host( return _ipam.delete_service_host(
hostname=hostname, host_addresses=host_addresses, cname_aliases=cname_aliases, service_type=service_type hostname=hostname, host_addresses=host_addresses, cname_aliases=cname_aliases, service_type=service_type
) )
"""The Provisioning Proxy service, which interacts with LSO running externally. """The Provisioning Proxy service, which interacts with LSO running externally.
LSO is responsible for executing Ansible playbooks, that deploy subscriptions. LSO is responsible for executing Ansible playbooks, that deploy subscriptions.
""" """
import json import json
...@@ -8,8 +9,6 @@ import requests ...@@ -8,8 +9,6 @@ import requests
from orchestrator import inputstep from orchestrator import inputstep
from orchestrator.config.assignee import Assignee from orchestrator.config.assignee import Assignee
from orchestrator.domain import SubscriptionModel from orchestrator.domain import SubscriptionModel
# noinspection PyProtectedMember
from orchestrator.forms import FormPage, ReadOnlyField from orchestrator.forms import FormPage, ReadOnlyField
from orchestrator.forms.validators import Accept, Label, LongText from orchestrator.forms.validators import Accept, Label, LongText
from orchestrator.types import State, UUIDstr, strEnum from orchestrator.types import State, UUIDstr, strEnum
...@@ -25,6 +24,7 @@ logger = logging.getLogger(__name__) ...@@ -25,6 +24,7 @@ logger = logging.getLogger(__name__)
class CUDOperation(strEnum): class CUDOperation(strEnum):
"""Enum for different C(R)UD operations that the provisioning proxy supports. """Enum for different C(R)UD operations that the provisioning proxy supports.
Read is not applicable, hence these become CUD and not CRUD operations. Read is not applicable, hence these become CUD and not CRUD operations.
""" """
...@@ -37,8 +37,7 @@ class CUDOperation(strEnum): ...@@ -37,8 +37,7 @@ class CUDOperation(strEnum):
def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operation: CUDOperation): def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operation: CUDOperation):
"""Internal function for sending a request to LSO. The callback address is """Send a request to LSO. The callback address is derived using the process ID provided.
derived using the process ID provided.
:param str endpoint: The LSO-specific endpoint to call, depending on the :param str endpoint: The LSO-specific endpoint to call, depending on the
type of service object that is acted upon. type of service object that is acted upon.
...@@ -70,12 +69,12 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operatio ...@@ -70,12 +69,12 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operatio
request = requests.delete(url, json=parameters, timeout=10000) request = requests.delete(url, json=parameters, timeout=10000)
if request.status_code != 200: if request.status_code != 200:
print(request.content) logger.debug(request.content)
raise AssertionError(request.content) raise AssertionError(request.content)
def provision_device(subscription: DeviceProvisioning, process_id: UUIDstr, dry_run: bool = True): def provision_device(subscription: DeviceProvisioning, process_id: UUIDstr, dry_run: bool = True):
"""Function that provisions a new device using LSO. """Provision a new device using LSO.
:param :class:`DeviceProvisioning` subscription: The subscription object :param :class:`DeviceProvisioning` subscription: The subscription object
that is to be provisioned. that is to be provisioned.
...@@ -91,7 +90,7 @@ def provision_device(subscription: DeviceProvisioning, process_id: UUIDstr, dry_ ...@@ -91,7 +90,7 @@ def provision_device(subscription: DeviceProvisioning, process_id: UUIDstr, dry_
def provision_ip_trunk( def provision_ip_trunk(
subscription: IptrunkProvisioning, process_id: UUIDstr, config_object: str, dry_run: bool = True subscription: IptrunkProvisioning, process_id: UUIDstr, config_object: str, dry_run: bool = True
): ):
"""Function that provisions an IP trunk service using LSO. """Provision an IP trunk service using LSO.
:param :class:`IptrunkProvisioning` subscription: The subscription object :param :class:`IptrunkProvisioning` subscription: The subscription object
that is to be provisioned. that is to be provisioned.
...@@ -136,7 +135,7 @@ def provision_ip_trunk( ...@@ -136,7 +135,7 @@ def provision_ip_trunk(
def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, dry_run: bool = True): def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, dry_run: bool = True):
"""Function that provisions an IP trunk service using LSO. """Deprovision an IP trunk service using LSO.
:param :class:`IptrunkProvisioning` subscription: The subscription object :param :class:`IptrunkProvisioning` subscription: The subscription object
that is to be provisioned. that is to be provisioned.
...@@ -162,7 +161,7 @@ def await_pp_results(subscription: SubscriptionModel, label_text: str) -> State: ...@@ -162,7 +161,7 @@ def await_pp_results(subscription: SubscriptionModel, label_text: str) -> State:
@validator("pp_run_results", allow_reuse=True, pre=True, always=True) @validator("pp_run_results", allow_reuse=True, pre=True, always=True)
def run_results_must_be_given(cls, run_results): def run_results_must_be_given(cls, run_results):
if run_results is None: if run_results is None:
raise ValueError("Run results may not be empty. " "Wait for the provisioning proxy to finish.") raise ValueError("Run results may not be empty. Wait for the provisioning proxy to finish.")
return run_results return run_results
result_page = yield ProvisioningResultPage result_page = yield ProvisioningResultPage
......
"""GSO settings, ensuring that the required parameters are set correctly.""" """GSO settings, ensuring that the required parameters are set correctly."""
import ipaddress import ipaddress
import json import json
import logging
import os import os
# from pydantic import BaseSettings, Field
from pydantic import BaseSettings from pydantic import BaseSettings
logger = logging.getLogger(__name__)
class GeneralParams(BaseSettings): class GeneralParams(BaseSettings):
"""General parameters for a GSO configuration file.""" """General parameters for a GSO configuration file."""
...@@ -42,8 +44,9 @@ class V6NetworkParams(BaseSettings): ...@@ -42,8 +44,9 @@ class V6NetworkParams(BaseSettings):
class ServiceNetworkParams(BaseSettings): class ServiceNetworkParams(BaseSettings):
"""Parameters for InfoBlox that describe IPv4 and v6 networks, and the """Parameters for InfoBlox.
corresponding domain name that should be used as a suffix.
The parameters describe IPv4 and v6 networks, and the corresponding domain name that should be used as a suffix.
""" """
V4: V4NetworkParams V4: V4NetworkParams
...@@ -82,12 +85,10 @@ class OSSParams(BaseSettings): ...@@ -82,12 +85,10 @@ class OSSParams(BaseSettings):
def load_oss_params() -> OSSParams: def load_oss_params() -> OSSParams:
"""Look for OSS_PARAMS_FILENAME in the environment and load the parameters """Look for OSS_PARAMS_FILENAME in the environment and load the parameters from that file."""
from that file.
"""
with open(os.environ["OSS_PARAMS_FILENAME"], encoding="utf-8") as file: with open(os.environ["OSS_PARAMS_FILENAME"], encoding="utf-8") as file:
return OSSParams(**json.loads(file.read())) return OSSParams(**json.loads(file.read()))
if __name__ == "__main__": if __name__ == "__main__":
print(load_oss_params()) logger.debug(load_oss_params())
import ipaddress import ipaddress
from typing import Type
from orchestrator.forms import FormPage from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label from orchestrator.forms.validators import Label
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import InputForm, SubscriptionLifecycle, UUIDstr from orchestrator.types import SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import done, init, step, workflow from orchestrator.workflow import done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import BaseModel
from gso.products.product_types.device import Device from gso.products.product_types.device import Device
from gso.services import ipam from gso.services import ipam
def initial_input_form_generator(subscription_id: UUIDstr) -> InputForm: def initial_input_form_generator(subscription_id: UUIDstr) -> Type[BaseModel]:
subscription = Device.from_subscription(subscription_id) subscription = Device.from_subscription(subscription_id)
class TerminateForm(FormPage): class TerminateForm(FormPage):
......
...@@ -17,6 +17,7 @@ exclude = ''' ...@@ -17,6 +17,7 @@ exclude = '''
| \.tox | \.tox
| venv | venv
| gso/migrations | gso/migrations
| gso/services/_ipam\.py
)/ )/
) )
''' '''
...@@ -51,7 +52,8 @@ exclude = [ ...@@ -51,7 +52,8 @@ exclude = [
"__pycache__", "__pycache__",
"htmlcov", "htmlcov",
"venv", "venv",
"gso/migrations" "gso/migrations",
"_ipam.py" # TODO: remove
] ]
ignore = [ ignore = [
"C417", "C417",
...@@ -72,7 +74,8 @@ ignore = [ ...@@ -72,7 +74,8 @@ ignore = [
"N803", "N803",
"N801", "N801",
"N815", "N815",
"N802" "N802",
"S101"
] ]
line-length = 120 line-length = 120
select = [ select = [
...@@ -94,7 +97,7 @@ target-version = "py310" ...@@ -94,7 +97,7 @@ target-version = "py310"
ban-relative-imports = "all" ban-relative-imports = "all"
[tool.ruff.per-file-ignores] [tool.ruff.per-file-ignores]
"test/*" = ["S101", "B033", "N816", "N802"] "test/*" = ["B033", "N816", "N802"]
[tool.ruff.isort] [tool.ruff.isort]
known-third-party = ["pydantic", "migrations"] known-third-party = ["pydantic", "migrations"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment