diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index 32591d0d33db773fc28cc0b7001bc0a092139675..dc7df6837e91175f5cff54d4d94d48040bb215a7 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -81,6 +81,32 @@ def _ip_network_version(network): return ip_version +def _assert_host_in_service( + ipv4_addr='', ipv6_addr='', + oss_ipv4_containers=None, oss_ipv6_containers=None, + oss_ipv4_networks=None, oss_ipv6_networks=None +): + # IPv4 + if oss_ipv4_containers: + assert any(ipv4_addr in oss_ipv4_container + for oss_ipv4_container in oss_ipv4_containers), \ + "Host's IPv4 address doesn't belong to service type." + else: + assert any(ipv4_addr in oss_ipv4_network + for oss_ipv4_network in oss_ipv4_networks), \ + "Host's IPv4 address doesn't belong to service type." + + # IPv6 + if oss_ipv6_containers: + assert any(ipv6_addr in oss_ipv6_container + for oss_ipv6_container in oss_ipv6_containers), \ + "Host's IPv6 address doesn't belong to service type." + else: + assert any(ipv6_addr in oss_ipv6_network + for oss_ipv6_network in oss_ipv6_networks), \ + "Host's IPv6 address doesn't belong to service type." + + def _find_networks(network_container=None, network=None, ip_version=4): """ If network_container is not None, find all networks within the specified @@ -248,7 +274,10 @@ def _allocate_host(hostname='', If networks is not None, allocate host in those networks. Otherwise if addrs is not None, allocate host with those addresses. hostname parameter must be full name including domain name. - Return an error string if couldn't allocate host due to network full. + Return "IPV4_NETWORK_FULL" or "IPV6_NETWORK_FULL" + if couldn't allocate host due to requested network being full. + Return "IPV4_NETWORK_NOT_FOUND" or "IPV6_NETWORK_NOT_FOUND" + if couldn't allocate host due to requested network not existing. """ # TODO: should hostnames be unique # (i.e. fail if hostname already exists in this domain/service)? @@ -473,23 +502,13 @@ def allocate_service_host(hostname='', "Network does not exist. Create it first." elif host_addresses: - # IPv4 ipv4_addr = host_addresses.v4 - if oss_ipv4_containers: - assert any(ipv4_addr in oss_ipv4_container - for oss_ipv4_container in oss_ipv4_containers) - else: - assert any(ipv4_addr in oss_ipv4_network - for oss_ipv4_network in oss_ipv4_networks) - - # IPv6 ipv6_addr = host_addresses.v6 - if oss_ipv6_containers: - assert any(ipv6_addr in oss_ipv6_container - for oss_ipv6_container in oss_ipv6_containers) - else: - assert any(ipv6_addr in oss_ipv6_network - for oss_ipv6_network in oss_ipv6_networks) + _assert_host_in_service( + ipv4_addr, ipv6_addr, + oss_ipv4_containers, oss_ipv6_containers, + oss_ipv4_networks, oss_ipv6_networks + ) host = _allocate_host( hostname=hostname+domain_name, @@ -569,73 +588,98 @@ def delete_service_network(ipnetwork=None, service_type='' return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) -# def delete_service_host( - # hostname='', - # host_addresses: HostAddresses = None, - # cname_aliases=[], - # service_type='' -# ) -> Union[V4HostAddress, V6HostAddress]: - # """ - # Delete IPv4 or IPv6 host by its address. - # """ - # oss = settings.load_oss_params() - # assert oss.IPAM.INFOBLOX - # infoblox_params = oss.IPAM.INFOBLOX - - # ip_version = _ip_addr_version(addr) - # ip_param = 'ipv4addr' if ip_version == 4 else 'ipv6addr' - - # # Find host record reference - # r = requests.get( - # f'{_wapi(infoblox_params)}/record:host', - # params={ip_param: addr}, - # auth=HTTPBasicAuth(infoblox_params.username, - # infoblox_params.password), - # verify=False - # ) - # host_data = r.json() - # assert len(host_data) == 1, "Host does not exist." - # assert '_ref' in host_data[0] - # host_ref = host_data[0]['_ref'] - - # # Delete it - # r = requests.delete( - # f'{_wapi(infoblox_params)}/{host_ref}', - # auth=HTTPBasicAuth(infoblox_params.username, - # infoblox_params.password), - # verify=False - # ) - # assert r.status_code >= 200 and r.status_code < 300, \ - # f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # # Also find and delete the associated dns a/aaaa record - # endpoint = 'record:a' if ip_version == 4 else 'record:aaaa' - - # r = requests.get( - # f'{_wapi(infoblox_params)}/{endpoint}', - # params={ip_param: addr}, - # auth=HTTPBasicAuth(infoblox_params.username, - # infoblox_params.password), - # verify=False - # ) - # dns_data = r.json() - # assert len(dns_data) == 1, "DNS record does not exist." - # assert '_ref' in dns_data[0] - # dns_ref = dns_data[0]['_ref'] - - # r = requests.delete( - # f'{_wapi(infoblox_params)}/{dns_ref}', - # auth=HTTPBasicAuth(infoblox_params.username, - # infoblox_params.password), - # verify=False - # ) - # assert r.status_code >= 200 and r.status_code < 300, \ - # f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # if ip_version == 4: - # return V4HostAddress(v4=addr) - # else: - # return V6HostAddress(v6=addr) +def delete_service_host( + hostname='', + host_addresses: HostAddresses = None, + cname_aliases=[], + service_type='' +) -> Union[V4HostAddress, V6HostAddress]: + """ + Delete host record and associated CNAME records. + All arguments passed to this function must match together a host record in + IPAM, and all CNAME records associated to it must also be passed exactly. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + domain_name = getattr(ipam_params, service_type).domain_name + dns_view = getattr(ipam_params, service_type).dns_view + ipv4_addr = str(host_addresses.v4) + ipv6_addr = str(host_addresses.v6) + + _assert_host_in_service( + host_addresses.v4, host_addresses.v6, + oss_ipv4_containers, oss_ipv6_containers, + oss_ipv4_networks, oss_ipv6_networks + ) + + # Find host record reference + r = requests.get( + f'{_wapi(infoblox_params)}/record:host', + params={ + 'name': (hostname+domain_name).lower(), # hostnames are lowercase + 'ipv4addr': ipv4_addr, + 'ipv6addr': ipv6_addr, + 'view': dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + host_data = r.json() + assert len(host_data) == 1, "Host does not exist." + assert '_ref' in host_data[0] + host_ref = host_data[0]['_ref'] + + # Find cname records reference + r = requests.get( + f'{_wapi(infoblox_params)}/record:cname', + params={ + 'canonical': hostname+domain_name, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + cname_data = r.json() + provided_cnames = [item + domain_name for item in cname_aliases] + found_cnames = [item['name'] for item in cname_data if 'name' in item] + assert provided_cnames == found_cnames, \ + "Provided CNAME alias names don't match the ones poiting to hostname." + + # Delete the host record + r = requests.delete( + f'{_wapi(infoblox_params)}/{host_ref}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Delete the CNAME records + cname_refs = [item['_ref'] for item in cname_data if 'name' in item] + for cname_ref in cname_refs: + r = requests.delete( + f'{_wapi(infoblox_params)}/{cname_ref}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + return host_addresses """