diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index ec1dd1e86e499c0359500118ee83e691b89617a4..84d9d7e2a7f30525a375d1f92f1e4128665b42ee 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -172,9 +172,9 @@ def _allocate_network( assert 'network' in r.json() allocated_network = r.json()['network'] if ip_version == 4: - return V4ServiceNetwork(v4=allocated_network) + return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) else: - return V6ServiceNetwork(v6=allocated_network) + return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) def allocate_service_ipv4_network(service_type, comment="", extattrs={} @@ -495,6 +495,72 @@ def allocate_service_host(hostname=None, return host +def delete_service_network(network, service_type=None + ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + """ + Delete IPv4 or IPv6 network by CIDR. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + + ip_version = _ip_network_version(network) + ipnetwork = ipaddress.ip_network(network) + + # Ensure that the network to be deleted is under the service type. + # Otherwise user is not allowed to delete it + if ip_version == 4: + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + if oss_ipv4_containers: + assert any(ipnetwork.subnet_of(oss_ipv4_container) + for oss_ipv4_container in oss_ipv4_containers), \ + "Can't delete: network doesn't belong to service type." + else: + assert ipnetwork in oss_ipv4_networks, \ + "Can't delete: network doesn't belong to service type." + + else: + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + if oss_ipv6_containers: + assert any(ipnetwork.subnet_of(oss_ipv6_container) + for oss_ipv6_container in oss_ipv6_containers), \ + "Can't delete: network doesn't belong to service type." + else: + assert ipnetwork in oss_ipv6_networks, \ + "Can't delete: network doesn't belong to service type." + + network_info = _find_networks(network=network, ip_version=ip_version) + assert len(network_info) == 1, "Network does not exist." + assert '_ref' in network_info[0] + + r = requests.delete( + f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Extract ipv4/ipv6 address from the network reference obtained in the + # response + r_text = r.text + print(r_text) + network_address = ipaddress.ip_network( + r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) + if ip_version == 4: + return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) + else: + return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) + + """ Below methods are not used for supported outside calls """ @@ -556,43 +622,6 @@ def _get_network_capacity(network=None): return utilization -def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - """ - Delete IPv4 or IPv6 network by CIDR. - """ - # TODO: should we check that there are no hosts in this network before - # deleting? Deleting a network deletes the hosts in it, but not the - # associated DNS records. - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - - network_info = _find_networks(network=network, ip_version=ip_version) - assert len(network_info) == 1, "Network does not exist." - assert '_ref' in network_info[0] - - r = requests.delete( - f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Extract ipv4/ipv6 address from the network reference obtained in the - # response - r_json = r.json() - network_address = ipaddress.ip_network( - r_json.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) - if ip_version == 4: - return V4ServiceNetwork(v4=network_address) - else: - return V6ServiceNetwork(v6=network_address) - - def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]: """ Delete IPv4 or IPv6 host by its address. diff --git a/gso/services/ipam.py b/gso/services/ipam.py index 126249e90d11adc67a23b2dd839167fd0cbe985c..cd8d2588d8a7953a6721820370aebd189a93c736 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -1,5 +1,6 @@ import ipaddress from pydantic import BaseSettings +from typing import Union from gso.services import _ipam @@ -57,6 +58,14 @@ def new_service_host(hostname, extattrs=extattrs) +def delete_service_network(network, service_type + ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + return _ipam.delete_service_network( + network=network, + service_type=service_type + ) + + if __name__ == '__main__': # sample call flow to allocate two loopback interfaces and a trunk service # new_service_host can be called passing networks, addresses, or nothing. @@ -74,11 +83,11 @@ if __name__ == '__main__': # loA_v4_host_address = ipaddress.ip_address('10.255.255.0') # loA_v6_host_address = ipaddress.ip_address('dead:beef::0') # loA_host_addresses = HostAddresses(v4=loA_v4_host_address, - # v6=loA_v6_host_address) + # v6=loA_v6_host_address) # new_service_host(hostname=hostname_A+"_LO", - # host_addresses=loA_host_addresses, - # cname_aliases=["alias1.hA", "alias2.hA"], - # service_type='LO') + # host_addresses=loA_host_addresses, + # cname_aliases=["alias1.hA", "alias2.hA"], + # service_type='LO') new_service_host(hostname=hostname_A+"_LO", cname_aliases=["alias1.hA", "alias2.hA"], service_type='LO') diff --git a/test/test_ipam.py b/test/test_ipam.py index c72ba2f558d0e209b097a57dcc9ff857decb222b..e839c00ae6db7e2a2d81ddb4264c622f140a43c7 100644 --- a/test/test_ipam.py +++ b/test/test_ipam.py @@ -70,7 +70,6 @@ def test_new_service_host(data_config_filename): "network_view": "default" } ] - ) responses.add( @@ -83,7 +82,6 @@ def test_new_service_host(data_config_filename): "network_view": "default" } ] - ) responses.add( @@ -194,3 +192,97 @@ def test_new_service_host(data_config_filename): ) ) assert service_hosts is None + + +@responses.activate +def test_delete_service_network(data_config_filename): + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/network.*10.255.255.0.*'), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + "network": "10.255.255.0/26", + "network_view": "default" + } + ] + ) + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/network.*10.255.255.20.*'), + json=[ + { + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 + "network": "100.255.255.20/32", + "network_view": "default" + } + ] + ) + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/ipv6network.*dead.*beef.*'), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + "network_view": "default" + } + ] + ) + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/ipv6network.*beef.*dead.*'), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 + "network": "beef:dead::18/128", + "network_view": "default" + } + ] + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*/network.*10.255.255.0.*'), + body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default" # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*/network.*100.255.255.*'), + body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default" # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*/ipv6network.*dead.*beef.*'), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default" # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*/ipv6network.*beef.*dead.*'), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default" # noqa: E501 + ) + + service_network = ipam.delete_service_network(network='10.255.255.0/26', service_type='LO') # noqa: E501 + assert service_network == ipam.V4ServiceNetwork( + v4=ipaddress.ip_network('10.255.255.0/26') + ) + + with pytest.raises(AssertionError): + service_network = ipam.delete_service_network(network='10.255.255.20/32', service_type='LO') # noqa: E501 + assert service_network is None + + service_network = ipam.delete_service_network(network='dead:beef::18/128', service_type='TRUNK') # noqa: E501 + assert service_network == ipam.V6ServiceNetwork( + v6=ipaddress.ip_network('dead:beef::18/128') + ) + + with pytest.raises(AssertionError): + service_network = ipam.delete_service_network(network='beef:dead::18/128', service_type='TRUNK') # noqa: E501 + assert service_network is None