diff --git a/gso/services/ipam.py b/gso/services/ipam.py index 6299c9c401e2cf0352825bf5ab2c6d70a635acdf..05fa7a4531880fac2be5687680fe8268b28e30da 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -87,7 +87,7 @@ def assert_host_in_service( oss_ipv4_containers = None, oss_ipv6_containers = None, oss_ipv4_networks = None, - oss_ipv6_networks = None, + oss_ipv6_networks = None ): # IPv4 if oss_ipv4_containers: @@ -110,6 +110,33 @@ def assert_host_in_service( ), "Host's IPv6 address doesn't belong to service type." +def assert_network_in_service( + ipv4_network: Optional[V4ServiceNetwork] = None, + ipv6_network: Optional[V6ServiceNetwork] = None, + oss_ipv4_containers = None, + oss_ipv6_containers = None, + oss_ipv4_networks = None, + oss_ipv6_networks = None +): + # IPv4 + if ipv4_network: + if oss_ipv4_containers: + assert any( + ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers + ), "Network doesn't belong to service type." + else: + assert ipv4_network in oss_ipv4_networks, "Network doesn't belong to service type." + + # IPv6 + if ipv6_network: + if oss_ipv6_containers: + assert any( + ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers + ), "Network doesn't belong to service type." + else: + assert ipv6_network in oss_ipv6_networks, "Network doesn't belong to service type." + + def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): """If network_container is not None, find all networks within the specified container. Otherwise, if network is not None, find the specified network. @@ -126,7 +153,7 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str] if network_container: params = {"network_container": network_container} elif network: - params = {"network": network} + params = {"network": network, "_return_fields": "comment"} r = requests.get( f"{wapi(infoblox_params)}/{endpoint}", params=params, @@ -281,7 +308,7 @@ def allocate_host_inner( cname_aliases = [] if extattrs is None: extattrs = {} - assert addrs or networks, "You must specify either the host addresses or the networks CIDR." + assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host." oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX @@ -454,19 +481,11 @@ def allocate_host( ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) elif service_networks: - # IPv4 ipv4_network = service_networks.v4 - if oss_ipv4_containers: - assert any(ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers) - else: - assert ipv4_network in oss_ipv4_networks - - # IPv6 ipv6_network = service_networks.v6 - if oss_ipv6_containers: - assert any(ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers) - else: - assert ipv6_network in oss_ipv6_networks + assert_network_in_service( + ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks + ) host = allocate_host_inner( hostname=hostname + domain_name, @@ -476,7 +495,7 @@ def allocate_host( extattrs=extattrs, ) assert "NETWORK_FULL" not in host, "Network is full." - assert "NETWORK_NOT_FOUND" not in host, "Network does not exist. Create it first." + assert "NETWORK_NOT_FOUND" not in host, "Network does not exist in IPAM. Create it first." elif host_addresses: ipv4_addr = host_addresses.v4 @@ -508,35 +527,30 @@ def delete_network( assert ipam_params.INFOBLOX infoblox_params = ipam_params.INFOBLOX + assert network, "No network specified to delete." assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." ip_version = ip_network_version(str(network)) # Ensure that the network to be deleted is under the service type. # Otherwise user is not allowed to delete it + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + ipv4_network = None + ipv6_network = None if ip_version == 4: - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - if oss_ipv4_containers: - assert any( - network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers - ), "Can't delete: network doesn't belong to service type." - else: - assert network in oss_ipv4_networks, "Can't delete: network doesn't belong to service type." - + ipv4_network = network else: - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - if oss_ipv6_containers: - assert any( - network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers - ), "Can't delete: network doesn't belong to service type." - else: - assert network in oss_ipv6_networks, "Can't delete: network doesn't belong to service type." + ipv6_network = network + assert_network_in_service( + ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks + ) network_info = find_networks(network=str(network), ip_version=ip_version) - assert len(network_info) == 1, "Network does not exist." - assert "_ref" in network_info[0] + assert len(network_info) == 1, "Network to delete does not exist in IPAM." + assert "_ref" in network_info[0], "Network to delete does not exist in IPAM." r = requests.delete( f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', @@ -573,6 +587,7 @@ def delete_host( assert ipam_params.INFOBLOX infoblox_params = ipam_params.INFOBLOX + assert host_addresses, "No host specified to delete." assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers @@ -606,8 +621,8 @@ def delete_host( timeout=REQUESTS_TIMEOUT ) host_data = r.json() - assert len(host_data) == 1, "Host does not exist." - assert "_ref" in host_data[0] + assert len(host_data) == 1, "Host to delete does not exist in IPAM." + assert "_ref" in host_data[0], "Host to delete does not exist in IPAM." host_ref = host_data[0]["_ref"] # Find cname records reference @@ -647,3 +662,23 @@ def delete_host( assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return host_addresses + + +def validate_network( + gso_subscription_id: str = "", + network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, + service_type: str = "" +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + """Validate IPv4 or IPv6 network. + Check if the specified network exist, and, if it does, + check if its comment field contains gso_subscription_id. + """ + assert network, "No network specified to validate." + + ip_version = ip_network_version(str(network)) + network_info = find_networks(network=str(network), ip_version=ip_version) + assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM." + assert "comment" in network_info[0], "Network to validate does not have comment in IPAM." + assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network." + + return network