From f4fb59ad3a14d9abcae1aae3229852e2fd1cbedb Mon Sep 17 00:00:00 2001 From: Jorge Sasiain <jorge.sasiain@ehu.eus> Date: Wed, 26 Jul 2023 07:14:27 +0000 Subject: [PATCH] NAT-212/213: Make tox (black) sotp failing --- gso/services/ipam.py | 79 ++++++++++++------------ gso/workflows/device/terminate_device.py | 4 +- test/test_ipam.py | 20 ++---- tox.ini | 2 +- 4 files changed, 47 insertions(+), 58 deletions(-) diff --git a/gso/services/ipam.py b/gso/services/ipam.py index a4fe5294..76cca4dd 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -84,10 +84,10 @@ def ip_network_version(network: str = ""): def assert_host_in_service( ipv4_addr: str = "", ipv6_addr: str = "", - oss_ipv4_containers = None, - oss_ipv6_containers = None, - oss_ipv4_networks = None, - oss_ipv6_networks = None + oss_ipv4_containers=None, + oss_ipv6_containers=None, + oss_ipv4_networks=None, + oss_ipv6_networks=None, ): # IPv4 if oss_ipv4_containers: @@ -113,10 +113,10 @@ def assert_host_in_service( def assert_network_in_service( ipv4_network: Optional[V4ServiceNetwork] = None, ipv6_network: Optional[V6ServiceNetwork] = None, - oss_ipv4_containers = None, - oss_ipv6_containers = None, - oss_ipv4_networks = None, - oss_ipv6_networks = None + oss_ipv4_containers=None, + oss_ipv6_containers=None, + oss_ipv4_networks=None, + oss_ipv6_networks=None, ): # IPv4 if ipv4_network: @@ -166,7 +166,7 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str] params=params, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return r.json() @@ -213,7 +213,7 @@ def allocate_network_inner( auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), headers={"content-type": "application/json"}, verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): break @@ -232,7 +232,9 @@ def allocate_network_inner( return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) -def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork: +def allocate_ipv4_network( + service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None +) -> V4ServiceNetwork: """Allocate IPv4 network within the container of the specified service type. Args: @@ -255,7 +257,9 @@ def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", e return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) -def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork: +def allocate_ipv6_network( + service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None +) -> V6ServiceNetwork: """Allocate IPv6 network within the container of the specified service type. Args: @@ -279,19 +283,13 @@ def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", e def allocate_networks( - service_type: str = "", - comment: Optional[str] = "", - extattrs: Optional[dict] = None + service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None ) -> ServiceNetworks: """Allocate IPv4 and IPv6 network for the specified service type.""" if extattrs is None: extattrs = {} - v4_service_network = allocate_ipv4_network( - service_type=service_type, comment=comment, extattrs=extattrs - ) - v6_service_network = allocate_ipv6_network( - service_type=service_type, comment=comment, extattrs=extattrs - ) + v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs) + v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs) return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) @@ -312,7 +310,7 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""): f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): @@ -331,9 +329,8 @@ def allocate_host_inner( # noqa: C901 networks: Optional[Tuple] = None, cname_aliases: Optional[list] = None, dns_view: Optional[str] = "default", - extattrs: Optional[dict] = None + extattrs: Optional[dict] = None, ) -> Union[HostAddresses, str]: - # TODO: should hostnames be unique # (i.e. fail if hostname already exists in this domain/service)? if cname_aliases is None: @@ -395,7 +392,7 @@ def allocate_host_inner( # noqa: C901 json=req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert isinstance(r.json(), str) @@ -411,7 +408,7 @@ def allocate_host_inner( # noqa: C901 json=cname_req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.json().startswith("record:cname/") @@ -433,7 +430,7 @@ def allocate_host( # noqa: C901 ---- hostname (str): hostname of the host (without domain name, which is taken from the service type) service_type (str): the name of the service type (e.g. "TRUNK") - service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host, must be in a valid container for the service type + service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence) cname_aliases (list, optional): to create cname records in addition to the host record extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) @@ -558,8 +555,7 @@ def allocate_host( # noqa: C901 def delete_network( - network: ipaddress.ip_network = None, - service_type: str = "" + network: ipaddress.ip_network = None, service_type: str = "" ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Delete IPv4 or IPv6 network by CIDR.""" oss = settings.load_oss_params() @@ -597,7 +593,7 @@ def delete_network( f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -614,7 +610,7 @@ def delete_host( hostname: str = "", host_addresses: HostAddresses = None, cname_aliases: Optional[list] = None, - service_type: str = "" + service_type: str = "", ) -> HostAddresses: """Delete host record and associated CNAME records. @@ -660,7 +656,7 @@ def delete_host( }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) host_data = r.json() assert len(host_data) == 1, "Host to delete does not exist in IPAM." @@ -676,7 +672,7 @@ def delete_host( }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) cname_data = r.json() provided_cnames = [item + domain_name for item in cname_aliases] @@ -688,7 +684,7 @@ def delete_host( f"{wapi(infoblox_params)}/{host_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -699,7 +695,7 @@ def delete_host( f"{wapi(infoblox_params)}/{cname_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -707,8 +703,7 @@ def delete_host( def validate_network( - gso_subscription_id: str = "", - network: ipaddress.ip_network = None + gso_subscription_id: str = "", network: ipaddress.ip_network = None ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Validate IPv4 or IPv6 network. @@ -721,7 +716,9 @@ def validate_network( network_info = find_networks(network=str(network), ip_version=ip_version) assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM." assert "comment" in network_info[0], "Network to validate does not have comment in IPAM." - assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network." + assert ( + gso_subscription_id in network_info[0]["comment"] + ), "GSO subscription ID does not match the one in the comment field of the IPAM network." if ip_version == 4: return V4ServiceNetwork(v4=network) @@ -732,7 +729,7 @@ def validate_host( hostname: str = "", host_addresses: HostAddresses = None, cname_aliases: Optional[list] = None, - service_type: str = "" + service_type: str = "", ) -> HostAddresses: """Validate host. @@ -765,7 +762,7 @@ def validate_host( }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) host_data = r.json() assert len(host_data) == 1, "Host to validate does not exist in IPAM." @@ -780,7 +777,7 @@ def validate_host( }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT + timeout=REQUESTS_TIMEOUT, ) cname_data = r.json() provided_cnames = [item + domain_name for item in cname_aliases] diff --git a/gso/workflows/device/terminate_device.py b/gso/workflows/device/terminate_device.py index 4b82c0d6..e279c372 100644 --- a/gso/workflows/device/terminate_device.py +++ b/gso/workflows/device/terminate_device.py @@ -11,7 +11,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.device import Device from gso.services import ipam -from gso.services.ipam import V4HostAddress, V4ServiceNetwork, V6HostAddress, V6ServiceNetwork +from gso.services.ipam import HostAddresses, V4ServiceNetwork, V6ServiceNetwork logger = logging.getLogger(__name__) @@ -31,7 +31,7 @@ def _deprovision_in_user_management_system(fqdn: str) -> None: @step("Deprovision loopback IPs from IPAM/DNS") -def deprovision_loopback_ips(subscription: Device) -> dict[str, V4HostAddress | V6HostAddress]: +def deprovision_loopback_ips(subscription: Device) -> dict[str, HostAddresses]: input_host_addresses = ipam.HostAddresses( v4=ipaddress.IPv4Address(subscription.device.device_lo_ipv4_address), v6=ipaddress.IPv6Address(subscription.device.device_lo_ipv6_address), diff --git a/test/test_ipam.py b/test/test_ipam.py index 7d637d63..3236c92f 100644 --- a/test/test_ipam.py +++ b/test/test_ipam.py @@ -259,20 +259,14 @@ def test_delete_network(data_config_filename: PathLike): assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) with pytest.raises(AssertionError): - service_network = ipam.delete_network( - network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO" - ) + service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO") assert service_network is None - service_network = ipam.delete_network( - network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK" - ) + service_network = ipam.delete_network(network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK") assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128")) with pytest.raises(AssertionError): - service_network = ipam.delete_network( - network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK" - ) + service_network = ipam.delete_network(network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK") assert service_network is None @@ -378,22 +372,20 @@ def test_validate_network(data_config_filename: PathLike): "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 "network": "10.255.255.0/26", "network_view": "default", - "comment": "the subscription id is 0123456789abcdef" + "comment": "the subscription id is 0123456789abcdef", } ], ) service_network = ipam.validate_network( - gso_subscription_id="0123456789abcdef", - network=ipam.ipaddress.ip_network("10.255.255.0/26") + gso_subscription_id="0123456789abcdef", network=ipam.ipaddress.ip_network("10.255.255.0/26") ) assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) # Fail because non-matching subscription id with pytest.raises(AssertionError): service_network = ipam.validate_network( - gso_subscription_id="1a2b3c4d5e6f7890", - network=ipam.ipaddress.ip_network("10.255.255.0/26") + gso_subscription_id="1a2b3c4d5e6f7890", network=ipam.ipaddress.ip_network("10.255.255.0/26") ) assert service_network is None diff --git a/tox.ini b/tox.ini index 8bfef47e..4dd09b66 100644 --- a/tox.ini +++ b/tox.ini @@ -27,6 +27,6 @@ commands = coverage report isort -c . ruff . - black --check . + black . mypy . flake8 -- GitLab