diff --git a/gso/services/ipam.py b/gso/services/ipam.py index 05fa7a4531880fac2be5687680fe8268b28e30da..a1bbab8d26a3065e708a23e6c66ed8139b6a1ee7 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -574,7 +574,7 @@ def delete_host( host_addresses: HostAddresses = None, cname_aliases: Optional[list] = None, service_type: str = "" -) -> Union[V4HostAddress, V6HostAddress]: +) -> HostAddresses: """Delete host record and associated CNAME records. All arguments passed to this function must match together a host record in IPAM, and all CNAME records associated to it must also be passed exactly. @@ -672,6 +672,7 @@ def validate_network( """Validate IPv4 or IPv6 network. Check if the specified network exist, and, if it does, check if its comment field contains gso_subscription_id. + Returns the network if validation successful. """ assert network, "No network specified to validate." @@ -682,3 +683,64 @@ def validate_network( assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network." return network + + +def validate_host( + hostname: str = "", + host_addresses: HostAddresses = None, + cname_aliases: Optional[list] = None, + service_type: str = "" +) -> HostAddresses: + """Validate host. + Check if all arguments passed to this function match together a host record in + IPAM, and all CNAME records associated to it also match exactly. + Returns the host if validation successful. + """ + if cname_aliases is None: + cname_aliases = [] + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert hostname and host_addresses, "No host specified to validate. Either hostname or host_addresses missing." + domain_name = getattr(ipam_params, service_type).domain_name + ipv4_addr = str(host_addresses.v4) + ipv6_addr = str(host_addresses.v6) + dns_view = getattr(ipam_params, service_type).dns_view + + # Find host record reference + r = requests.get( + f"{wapi(infoblox_params)}/record:host", + params={ + "name": (hostname + domain_name).lower(), # hostnames are lowercase + "ipv4addr": ipv4_addr, + "ipv6addr": ipv6_addr, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + timeout=REQUESTS_TIMEOUT + ) + host_data = r.json() + assert len(host_data) == 1, "Host to validate does not exist in IPAM." + assert "_ref" in host_data[0], "Host to validate does not exist in IPAM." + + # Find cname records reference + r = requests.get( + f"{wapi(infoblox_params)}/record:cname", + params={ + "canonical": hostname + domain_name, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + timeout=REQUESTS_TIMEOUT + ) + cname_data = r.json() + provided_cnames = [item + domain_name for item in cname_aliases] + found_cnames = [item["name"] for item in cname_data if "name" in item] + assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." + + return HostAddresses