From 53123d2dc1afb7b370128f0fb15ec8e8192fea02 Mon Sep 17 00:00:00 2001 From: Ubuntu <jorge.sasiain@ehu.eus> Date: Wed, 3 May 2023 17:38:55 +0000 Subject: [PATCH] NAT-152: fix all tox errors except lines too long --- gso/services/_ipam.py | 43 +++++++++++++++++++++++++------------------ gso/services/ipam.py | 3 +-- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index 2cf73d27..5a91e31e 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -1,6 +1,5 @@ import ipaddress import json -import random import requests from enum import Enum from pydantic import BaseSettings @@ -37,9 +36,10 @@ class HostAddresses(BaseSettings): class IPAMErrors(Enum): -# HTTP error code, match in error message + # HTTP error code, match in error message CONTAINER_FULL = 400, "Can not find requested number of networks" + # TODO: remove this! # lab infoblox cert is not valid for the ipv4 address # ... disable warnings for now @@ -49,10 +49,12 @@ requests.packages.urllib3.disable_warnings() def _match_error_code(response, error_code): return response.status_code == error_code.value[0] and error_code.value[1] in response.text + def _wapi(infoblox_params: settings.InfoBloxParams): return (f'https://{infoblox_params.host}' f'/wapi/{infoblox_params.wapi_version}') + def _ip_addr_version(addr): ip_version = None ip_addr = ipaddress.ip_address(addr) @@ -63,6 +65,7 @@ def _ip_addr_version(addr): assert ip_version in [4, 6] return ip_version + def _ip_network_version(network): ip_version = None ip_network = ipaddress.ip_network(network) @@ -73,6 +76,7 @@ def _ip_network_version(network): assert ip_version in [4, 6] return ip_version + def _find_networks(network_container=None, network=None, ip_version=4): """ If network_container is not None, find all networks within the specified container. @@ -86,9 +90,9 @@ def _find_networks(network_container=None, network=None, ip_version=4): endpoint = 'network' if ip_version == 4 else 'ipv6network' params = None if network_container: - params={'network_container': network_container} + params = {'network_container': network_container} elif network: - params={'network': network} + params = {'network': network} r = requests.get( f'{_wapi(infoblox_params)}/{endpoint}', params=params, @@ -99,6 +103,7 @@ def _find_networks(network_container=None, network=None, ip_version=4): assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return r.json() + def _get_network_capacity(network=None): """ Get utilization of a IPv4 network in a fraction of 1000. @@ -109,7 +114,7 @@ def _get_network_capacity(network=None): ip_version = _ip_network_version(network) assert ip_version == 4, "Utilization is only available for IPv4 networks." - params={'network': network, '_return_fields': 'network,total_hosts,utilization'} + params = {'network': network, '_return_fields': 'network,total_hosts,utilization'} r = requests.get( f'{_wapi(infoblox_params)}/network', @@ -141,7 +146,7 @@ def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params: "_object_parameters": { "network": str(network_params.containers[0]) }, - "_result_field": "networks", # only return in the response the allocated network, not all available + "_result_field": "networks", # only return in the response the allocated network, not all available } } @@ -167,11 +172,12 @@ def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params: assert 'network' in r.json() allocated_network = r.json()['network'] - if ip_version==4: + if ip_version == 4: return V4ServiceNetwork(v4=allocated_network) else: return V6ServiceNetwork(v6=allocated_network) + def allocate_service_ipv4_network(service_type) -> V4ServiceNetwork: """ Allocate IPv4 network within the container of the specified service type. @@ -182,6 +188,7 @@ def allocate_service_ipv4_network(service_type) -> V4ServiceNetwork: assert hasattr(ipam_params, service_type) and service_type != 'INFOBLOX', "Invalid service type." return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4) + def allocate_service_ipv6_network(service_type) -> V6ServiceNetwork: """ Allocate IPv6 network within the container of the specified service type. @@ -192,6 +199,7 @@ def allocate_service_ipv6_network(service_type) -> V6ServiceNetwork: assert hasattr(ipam_params, service_type) and service_type != 'INFOBLOX', "Invalid service type." return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6) + def _find_next_available_ip(infoblox_params, network_ref): r = requests.post( f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1', @@ -205,6 +213,7 @@ def _find_next_available_ip(infoblox_params, network_ref): assert len(received_ip) == 1 return received_ip[0] + def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddress, V6HostAddress]: """ If network is not None, allocate host in that network. @@ -249,20 +258,13 @@ def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddres assert isinstance(r.json(), str) assert r.json().startswith("record:host/") - a_record_payload = { - "ipv4addr": addr, - "name": hostname, - "view": "default" - } - - aaaa_record_payload = { - "ipv6addr": addr, + dns_req_payload = { + f"ipv{ip_req_payload}addr": addr, "name": hostname, "view": "default" } endpoint = 'record:a' if ip_version == 4 else 'record:aaaa' - dns_req_payload = a_record_payload if ip_version == 4 else aaaa_record_payload r = requests.post( f'{_wapi(infoblox_params)}/{endpoint}', @@ -279,6 +281,7 @@ def _allocate_host(hostname=None, addr=None, network=None) -> Union[V4HostAddres else: return V6HostAddress(v6=addr) + def allocate_service_host(hostname=None, service_type=None, service_networks: ServiceNetworks = None, host_addresses: HostAddresses = None) -> HostAddresses: """ Allocate host with both IPv4 and IPv6 address (and respective DNS records). @@ -323,7 +326,6 @@ def allocate_service_host(hostname=None, service_type=None, service_networks: Se assert any(addr in ipv4_container for ipv4_container in ipv4_containers) v4_host = _allocate_host(hostname=hostname+domain_name, addr=str(addr)) - # IPv6 if not service_networks and not host_addresses: # ipv6 does not support capacity fetching (not even the GUI displays it). @@ -342,12 +344,14 @@ def allocate_service_host(hostname=None, service_type=None, service_networks: Se assert any(addr in ipv6_container for ipv6_container in ipv6_containers) v6_host = _allocate_host(hostname=hostname+domain_name, addr=str(addr)) - return HostAddresses(v4=v4_host.v4,v6=v6_host.v6) + return HostAddresses(v4=v4_host.v4, v6=v6_host.v6) + """ Below methods are not used for supported outside calls """ + def _find_containers(network=None, ip_version=4): """ If network is not None, find that container. @@ -367,6 +371,7 @@ def _find_containers(network=None, ip_version=4): assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return r.json() + def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """ Delete IPv4 or IPv6 network by CIDR. @@ -398,6 +403,7 @@ def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: else: return V6ServiceNetwork(v6=network_address) + def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]: """ Delete IPv4 or IPv6 host by its address. @@ -455,6 +461,7 @@ def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]: else: return V6HostAddress(v6=addr) + if __name__ == '__main__': while True: print("1. Find all containers") diff --git a/gso/services/ipam.py b/gso/services/ipam.py index 92e66a0e..d93983f1 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -1,7 +1,6 @@ import ipaddress from pydantic import BaseSettings -from gso import settings import _ipam @@ -63,4 +62,4 @@ if __name__ == '__main__': # h1-h2 trunk trunk12_service_networks = new_service_networks('TRUNK') new_service_host(hostname_A, 'TRUNK', trunk12_service_networks) - new_service_host(hostname_B, 'TRUNK', trunk12_service_networks) \ No newline at end of file + new_service_host(hostname_B, 'TRUNK', trunk12_service_networks) -- GitLab