From d038748835b0ef452436144767c989d5f815875e Mon Sep 17 00:00:00 2001 From: Ubuntu <jorge.sasiain@ehu.eus> Date: Tue, 2 May 2023 16:39:04 +0000 Subject: [PATCH] NAT-152: minor refactoring/rearrangement --- gso/services/_ipam.py | 117 ++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 55 deletions(-) diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index 0688dcad..d3478337 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -65,25 +65,6 @@ def _ip_network_version(network): assert ip_version in [4, 6] return ip_version -def _find_containers(network=None, ip_version=4): - """ - If network is not None, find the container that contains specified network. - Otherwise find all containers. - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = 'networkcontainer' if ip_version == 4 else 'ipv6networkcontainer' - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={'network': network} if network else None, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - def _find_networks(network_container=None, network=None, ip_version=4): """ If network_container is not None, find all networks within the specified container. @@ -209,37 +190,6 @@ def allocate_service_ipv6_network(service_type) -> V6ServiceNetwork: assert hasattr(ipam_params, service_type) and service_type != 'INFOBLOX', "Invalid service type." return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6) -def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - """ - Delete IPv4 or IPv6 network by CIDR. - """ - # TODO: should we check that there are no hosts in this network before deleting? - # Deleting a network deletes the hosts in it, but not the associated DNS records. - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - - network_info = _find_networks(network=network, ip_version=ip_version) - assert len(network_info) == 1, "Network does not exist." - assert '_ref' in network_info[0] - - r = requests.delete( - f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Extract ipv4/ipv6 address from the network reference obtained in the response - r_json = r.json() - network_address = ipaddress.ip_network(r_json.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) - if ip_version == 4: - return V4ServiceNetwork(v4=network_address) - else: - return V6ServiceNetwork(v6=network_address) - def _find_next_available_ip(infoblox_params, network_ref): r = requests.post( f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1', @@ -344,8 +294,9 @@ def allocate_service_host(hostname=None, service_type=None, service_networks: Se """ Allocate host with both IPv4 and IPv6 address (and respective DNS records). The domain name is also taken from the service type and appended to specified hostname. - If service_networks is not provided, the first network with available space for this service type is used. If service_networks is provided, that one is used. + If service_networks is not provided, the first network with available space for this service type is used. + Note that if WFO will always specify the network after creating it, this mode won't be needed. """ oss = settings.load_oss_params() assert oss.IPAM @@ -373,8 +324,9 @@ def allocate_service_host(hostname=None, service_type=None, service_networks: Se assert first_nonfull_ipv4_network, "No available IPv4 addresses for this service type." v4_host = _allocate_host(hostname=hostname+domain_name, network=first_nonfull_ipv4_network) else: - network = str(service_networks.v4) - v4_host = _allocate_host(hostname=hostname+domain_name, network=network) + network = service_networks.v4 + assert network.subnet_of(ipv4_container) + v4_host = _allocate_host(hostname=hostname+domain_name, network=str(network)) # IPv6 if not service_networks: @@ -386,11 +338,66 @@ def allocate_service_host(hostname=None, service_type=None, service_networks: Se # TODO: if "no available IP" error, create a new network? v6_host = _allocate_host(hostname=hostname+domain_name, network=ipv6_networks_info[0]['network']) else: - network = str(service_networks.v6) - v6_host = _allocate_host(hostname=hostname+domain_name, network=network) + network = service_networks.v6 + assert network.subnet_of(ipv6_container) + v6_host = _allocate_host(hostname=hostname+domain_name, network=str(network)) return HostAddresses(v4=v4_host.v4,v6=v6_host.v6) +""" +Below methods are not used for supported outside calls +""" + +def _find_containers(network=None, ip_version=4): + """ + If network is not None, find that container. + Otherwise find all containers. + """ + assert ip_version in [4, 6] + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + endpoint = 'networkcontainer' if ip_version == 4 else 'ipv6networkcontainer' + r = requests.get( + f'{_wapi(infoblox_params)}/{endpoint}', + params={'network': network} if network else None, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + +def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + """ + Delete IPv4 or IPv6 network by CIDR. + """ + # TODO: should we check that there are no hosts in this network before deleting? + # Deleting a network deletes the hosts in it, but not the associated DNS records. + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + ip_version = _ip_network_version(network) + + network_info = _find_networks(network=network, ip_version=ip_version) + assert len(network_info) == 1, "Network does not exist." + assert '_ref' in network_info[0] + + r = requests.delete( + f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Extract ipv4/ipv6 address from the network reference obtained in the response + r_json = r.json() + network_address = ipaddress.ip_network(r_json.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) + if ip_version == 4: + return V4ServiceNetwork(v4=network_address) + else: + return V6ServiceNetwork(v6=network_address) + def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]: """ Delete IPv4 or IPv6 host by its address. -- GitLab