# mypy: ignore-errors import ipaddress from enum import Enum from typing import Optional, Tuple, Union import requests from pydantic import BaseSettings from requests.auth import HTTPBasicAuth from gso import settings class V4ServiceNetwork(BaseSettings): v4: ipaddress.IPv4Network class V6ServiceNetwork(BaseSettings): v6: ipaddress.IPv6Network class ServiceNetworks(BaseSettings): v4: ipaddress.IPv4Network v6: ipaddress.IPv6Network class V4HostAddress(BaseSettings): v4: ipaddress.IPv4Address class V6HostAddress(BaseSettings): v6: ipaddress.IPv6Address class HostAddresses(BaseSettings): v4: ipaddress.IPv4Address v6: ipaddress.IPv6Address class IPAMErrors(Enum): # HTTP error code, match in error message CONTAINER_FULL = 400, "Can not find requested number of networks" NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network" REQUESTS_TIMEOUT = 20 # TODO: remove this! # lab infoblox cert isn't valid for the ipv4 address # disable warnings for now requests.packages.urllib3.disable_warnings() def match_error_code(response, error_code): return response.status_code == error_code.value[0] and error_code.value[1] in response.text def wapi(infoblox_params: settings.InfoBloxParams): return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}" def ip_addr_version(addr: str = ""): ip_version = None ip_addr = ipaddress.ip_address(addr) if isinstance(ip_addr, ipaddress.IPv4Address): ip_version = 4 elif isinstance(ip_addr, ipaddress.IPv6Address): ip_version = 6 assert ip_version in [4, 6] return ip_version def ip_network_version(network: str = ""): ip_version = None ip_network = ipaddress.ip_network(network) if isinstance(ip_network, ipaddress.IPv4Network): ip_version = 4 elif isinstance(ip_network, ipaddress.IPv6Network): ip_version = 6 assert ip_version in [4, 6] return ip_version def assert_host_in_service( ipv4_addr: str = "", ipv6_addr: str = "", oss_ipv4_containers=None, oss_ipv6_containers=None, oss_ipv4_networks=None, oss_ipv6_networks=None, ): # IPv4 if oss_ipv4_containers: assert any( ipv4_addr in oss_ipv4_container for oss_ipv4_container in oss_ipv4_containers ), "Host's IPv4 address doesn't belong to service type." else: assert any( ipv4_addr in oss_ipv4_network for oss_ipv4_network in oss_ipv4_networks ), "Host's IPv4 address doesn't belong to service type." # IPv6 if oss_ipv6_containers: assert any( ipv6_addr in oss_ipv6_container for oss_ipv6_container in oss_ipv6_containers ), "Host's IPv6 address doesn't belong to service type." else: assert any( ipv6_addr in oss_ipv6_network for oss_ipv6_network in oss_ipv6_networks ), "Host's IPv6 address doesn't belong to service type." def assert_network_in_service( ipv4_network: Optional[V4ServiceNetwork] = None, ipv6_network: Optional[V6ServiceNetwork] = None, oss_ipv4_containers=None, oss_ipv6_containers=None, oss_ipv4_networks=None, oss_ipv6_networks=None, ): # IPv4 if ipv4_network: if oss_ipv4_containers: assert any( ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers ), "Network doesn't belong to service type." else: assert ipv4_network in oss_ipv4_networks, "Network doesn't belong to service type." # IPv6 if ipv6_network: if oss_ipv6_containers: assert any( ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers ), "Network doesn't belong to service type." else: assert ipv6_network in oss_ipv6_networks, "Network doesn't belong to service type." def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): """Get all networks optinally filtering by a container or by a network. Args: ---- network_container (str, optional): container to filter by network (str, optional): network to filter by ip_version (int): 4 or 6 Returns: ------- (list) all found networks mathing the args, which may be empty. """ assert ip_version in [4, 6] oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX endpoint = "network" if ip_version == 4 else "ipv6network" params = None if network_container: params = {"network_container": network_container} elif network: params = {"network": network, "_return_fields": "comment"} r = requests.get( f"{wapi(infoblox_params)}/{endpoint}", params=params, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return r.json() def allocate_network_inner( infoblox_params: settings.InfoBloxParams, network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], ip_version: int = 4, comment: Optional[str] = "", extattrs: Optional[dict] = None, ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: if extattrs is None: extattrs = {} assert ip_version in [4, 6] endpoint = "network" if ip_version == 4 else "ipv6network" ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer" assert network_params.containers, ( "No containers available to allocate networks for this service." "Maybe you want to allocate a host from a network directly?" ) # only return in the response the allocated network, not all available # TODO: any validation needed for extrattrs wherever it's used? req_payload = { "network": { "_object_function": "next_available_network", "_parameters": {"cidr": network_params.mask}, "_object": ip_container, "_object_parameters": {"network": str(network_params.containers[0])}, "_result_field": "networks", }, "comment": comment, "extattrs": extattrs, } container_index = 0 while True: r = requests.post( f"{wapi(infoblox_params)}/{endpoint}", params={"_return_fields": "network"}, json=req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), headers={"content-type": "application/json"}, verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): break # Container full: try with next valid container for service (if any) container_index += 1 if len(network_params.containers) < (container_index + 1): break req_payload["network"]["_object_parameters"]["network"] = str(network_params.containers[container_index]) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert "network" in r.json() allocated_network = r.json()["network"] if ip_version == 4: return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) def allocate_ipv4_network( service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None ) -> V4ServiceNetwork: """Allocate IPv4 network within the container of the specified service type. Args: ---- service_type (str): the name of the service type (e.g. "TRUNK") comment (str, optional): a custom comment to write in the comment field in IPAM extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) Returns: ------- (V4ServiceNetwork): the allocated network """ if extattrs is None: extattrs = {} oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) def allocate_ipv6_network( service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None ) -> V6ServiceNetwork: """Allocate IPv6 network within the container of the specified service type. Args: ---- service_type (str): the name of the service type (e.g. "TRUNK") comment (str, optional): a custom comment to write in the comment field in IPAM extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) Returns: ------- (V4ServiceNetwork): the allocated network """ if extattrs is None: extattrs = {} oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) def allocate_networks( service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None ) -> ServiceNetworks: """Allocate IPv4 and IPv6 network for the specified service type.""" if extattrs is None: extattrs = {} v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs) v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs) return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) def find_next_available_ip(infoblox_params, network_ref: str = ""): """Find the next available IP address from a network given its ref. Args: ---- infoblox_params (settings.InfoBloxParams): infoblox params network_ref (str): the network to find the next available IP, in InfoBlox reference format Returns: ------- (str): next available IP in the network, or "NETWORK_FULL" if there's no space in the network """ r = requests.post( f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): return "NETWORK_FULL" assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert "ips" in r.json() received_ip = r.json()["ips"] assert len(received_ip) == 1 return received_ip[0] def allocate_host_inner( # noqa: C901 hostname: str = "", addrs: Optional[Tuple] = None, networks: Optional[Tuple] = None, cname_aliases: Optional[list] = None, dns_view: Optional[str] = "default", extattrs: Optional[dict] = None, ) -> Union[HostAddresses, str]: # TODO: should hostnames be unique # (fail if hostname already exists in this domain/service)? if cname_aliases is None: cname_aliases = [] if extattrs is None: extattrs = {} assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host." oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX # If networks isn't None, allocate host in those networks. if networks: ipv4_network = networks[0] ipv6_network = networks[1] assert ip_network_version(ipv4_network) == 4 assert ip_network_version(ipv6_network) == 6 # Find the next available IP address in each network # If requested error doesn't exist, return error network_info = find_networks(network=ipv4_network, ip_version=4) if len(network_info) != 1: return "IPV4_NETWORK_NOT_FOUND" assert "_ref" in network_info[0] ipv4_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"]) network_info = find_networks(network=ipv6_network, ip_version=6) if len(network_info) != 1: return "IPV6_NETWORK_NOT_FOUND" assert "_ref" in network_info[0] ipv6_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"]) # If couldn't find next available IPs, return error if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": if ipv4_addr == "NETWORK_FULL": return "IPV4_NETWORK_FULL" if ipv6_addr == "NETWORK_FULL": return "IPV6_NETWORK_FULL" # Otherwise if addrs isn't None, allocate host with those addresses. else: ipv4_addr = addrs[0] ipv6_addr = addrs[1] assert ip_addr_version(ipv4_addr) == 4 assert ip_addr_version(ipv6_addr) == 6 # hostname parameter must be full name including domain name. req_payload = { "ipv4addrs": [{"ipv4addr": ipv4_addr}], "ipv6addrs": [{"ipv6addr": ipv6_addr}], "name": hostname, "configure_for_dns": True, "view": dns_view, "extattrs": extattrs, } r = requests.post( f"{wapi(infoblox_params)}/record:host", json=req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert isinstance(r.json(), str) assert r.json().startswith("record:host/") if cname_aliases: cname_req_payload = {"name": "", "canonical": hostname, "view": dns_view, "extattrs": extattrs} for alias in cname_aliases: cname_req_payload["name"] = alias r = requests.post( f"{wapi(infoblox_params)}/record:cname", json=cname_req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.json().startswith("record:cname/") return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) def allocate_host( # noqa: C901 hostname: str = "", service_type: str = "", service_networks: Optional[ServiceNetworks] = None, host_addresses: Optional[HostAddresses] = None, cname_aliases: Optional[list] = None, extattrs: Optional[dict] = None, ) -> HostAddresses: """Allocate host record with both IPv4 and IPv6 address, and respective DNS A and AAAA records. Args: ---- hostname (str): hostname of the host (without domain name, which is taken from the service type) service_type (str): the name of the service type (e.g. "TRUNK") service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence) cname_aliases (list, optional): to create CNAME records in addition to the host record extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) Returns: ------- (HostAddresses): ipv4 and ipv6 addresses of the allocated host """ if cname_aliases is None: cname_aliases = [] if extattrs is None: extattrs = {} oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks domain_name = getattr(ipam_params, service_type).domain_name dns_view = getattr(ipam_params, service_type).dns_view assert (oss_ipv4_containers and oss_ipv6_containers) or ( oss_ipv4_networks and oss_ipv6_networks ), "This service is missing either containers or networks configuration." assert domain_name, "This service is missing domain_name configuration." assert dns_view, "This service is missing dns_view configuration." if cname_aliases: cname_aliases = [alias + domain_name for alias in cname_aliases] # When neither service_networks not host_addresses are provided: # If service has configured containers, new ipv4 and ipv6 networks are created and those are used. # Note that in this case extattrs is for the hosts and not for the networks. # If service doesn't have configured containers and has configured networks instead, the configured # networks are used (they are filled up in order of appearance in the configuration file). if not service_networks and not host_addresses: if oss_ipv4_containers and oss_ipv6_containers: # This service has configured containers. # Use them to allocate new networks that can allocate the hosts. # IPv4 ipv4_network = str(allocate_ipv4_network(service_type=service_type).v4) assert ipv4_network, "No available space for IPv4 networks for this service type." # IPv6 ipv6_network = str(allocate_ipv6_network(service_type=service_type).v6) assert ipv6_network, "No available space for IPv6 networks for this service type." elif oss_ipv4_networks and oss_ipv6_networks: # This service has configured networks. # Allocate a host inside an ipv4 and ipv6 network from among them. ipv4_network = str(oss_ipv4_networks[0]) ipv6_network = str(oss_ipv6_networks[0]) ipv4_network_index = 0 ipv6_network_index = 0 while True: network_tuple = (ipv4_network, ipv6_network) host = allocate_host_inner( hostname=hostname + domain_name, networks=network_tuple, cname_aliases=cname_aliases, dns_view=dns_view, extattrs=extattrs, ) if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: break if "IPV4" in host: ipv4_network_index += 1 assert oss_ipv4_networks, "No available space in any IPv4 network for this service." assert ipv4_network_index < len( oss_ipv4_networks ), "No available space in any IPv4 network for this service." ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) else: # "IPV6" in host ipv6_network_index += 1 assert oss_ipv6_networks, "No available space in any IPv6 network for this service." assert ipv6_network_index < len( oss_ipv6_networks ), "No available space in any IPv6 network for this service." ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) elif service_networks: ipv4_network = service_networks.v4 ipv6_network = service_networks.v6 assert_network_in_service( ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks ) host = allocate_host_inner( hostname=hostname + domain_name, networks=(str(ipv4_network), str(ipv6_network)), cname_aliases=cname_aliases, dns_view=dns_view, extattrs=extattrs, ) assert "NETWORK_FULL" not in host, "Network is full." assert "NETWORK_NOT_FOUND" not in host, "Network does not exist in IPAM. Create it first." elif host_addresses: ipv4_addr = host_addresses.v4 ipv6_addr = host_addresses.v6 assert_host_in_service( ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks ) host = allocate_host_inner( hostname=hostname + domain_name, addrs=(str(ipv4_addr), str(ipv6_addr)), cname_aliases=cname_aliases, dns_view=dns_view, extattrs=extattrs, ) assert "NETWORK_FULL" not in host return host def delete_network( network: ipaddress.ip_network = None, service_type: str = "" ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Delete IPv4 or IPv6 network by CIDR.""" oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM assert ipam_params.INFOBLOX infoblox_params = ipam_params.INFOBLOX assert network, "No network specified to delete." assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." ip_version = ip_network_version(str(network)) # Ensure that the network to be deleted is under the service type. # Otherwise user isn't allowed to delete it oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks ipv4_network = None ipv6_network = None if ip_version == 4: ipv4_network = network else: ipv6_network = network assert_network_in_service( ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks ) network_info = find_networks(network=str(network), ip_version=ip_version) assert len(network_info) == 1, "Network to delete does not exist in IPAM." assert "_ref" in network_info[0], "Network to delete does not exist in IPAM." r = requests.delete( f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" # Extract ipv4/ipv6 address from the network reference obtained in the # response r_text = r.text network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) if ip_version == 4: return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) def delete_host( hostname: str = "", host_addresses: HostAddresses = None, cname_aliases: Optional[list] = None, service_type: str = "", ) -> HostAddresses: """Delete host record and associated CNAME records. All arguments passed to this function must match together a host record in IPAM, and all CNAME records associated to it must also be passed exactly. """ if cname_aliases is None: cname_aliases = [] oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM assert ipam_params.INFOBLOX infoblox_params = ipam_params.INFOBLOX assert host_addresses, "No host specified to delete." assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks domain_name = getattr(ipam_params, service_type).domain_name dns_view = getattr(ipam_params, service_type).dns_view ipv4_addr = str(host_addresses.v4) ipv6_addr = str(host_addresses.v6) assert_host_in_service( host_addresses.v4, host_addresses.v6, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks, ) # Find host record reference r = requests.get( f"{wapi(infoblox_params)}/record:host", params={ "name": (hostname + domain_name).lower(), # hostnames are lowercase "ipv4addr": ipv4_addr, "ipv6addr": ipv6_addr, "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) host_data = r.json() assert len(host_data) == 1, "Host to delete does not exist in IPAM." assert "_ref" in host_data[0], "Host to delete does not exist in IPAM." host_ref = host_data[0]["_ref"] # Find CNAME records reference r = requests.get( f"{wapi(infoblox_params)}/record:cname", params={ "canonical": hostname + domain_name, "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) cname_data = r.json() provided_cnames = [item + domain_name for item in cname_aliases] found_cnames = [item["name"] for item in cname_data if "name" in item] assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." # Delete the host record r = requests.delete( f"{wapi(infoblox_params)}/{host_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" # Delete the CNAME records cname_refs = [item["_ref"] for item in cname_data if "name" in item] for cname_ref in cname_refs: r = requests.delete( f"{wapi(infoblox_params)}/{cname_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return host_addresses def validate_network( gso_subscription_id: str = "", network: ipaddress.ip_network = None ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Validate IPv4 or IPv6 network. Check if the specified network exist, and, if it does, check if its comment field contains gso_subscription_id. Returns the network if validation successful. """ assert network, "No network specified to validate." ip_version = ip_network_version(str(network)) network_info = find_networks(network=str(network), ip_version=ip_version) assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM." assert "comment" in network_info[0], "Network to validate does not have comment in IPAM." assert ( gso_subscription_id in network_info[0]["comment"] ), "GSO subscription ID does not match the one in the comment field of the IPAM network." if ip_version == 4: return V4ServiceNetwork(v4=network) return V6ServiceNetwork(v6=network) def validate_host( hostname: str = "", host_addresses: HostAddresses = None, cname_aliases: Optional[list] = None, service_type: str = "", ) -> HostAddresses: """Validate host. Check if all arguments passed to this function match together a host record in IPAM, and all CNAME records associated to it also match exactly. Returns the host if validation successful. """ if cname_aliases is None: cname_aliases = [] oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM assert ipam_params.INFOBLOX infoblox_params = ipam_params.INFOBLOX assert hostname and host_addresses, "No host specified to validate. Either hostname or host_addresses missing." domain_name = getattr(ipam_params, service_type).domain_name ipv4_addr = str(host_addresses.v4) ipv6_addr = str(host_addresses.v6) dns_view = getattr(ipam_params, service_type).dns_view # Find host record reference r = requests.get( f"{wapi(infoblox_params)}/record:host", params={ "name": (hostname + domain_name).lower(), # hostnames are lowercase "ipv4addr": ipv4_addr, "ipv6addr": ipv6_addr, "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) host_data = r.json() assert len(host_data) == 1, "Host to validate does not exist in IPAM." assert "_ref" in host_data[0], "Host to validate does not exist in IPAM." # Find CNAME records reference r = requests.get( f"{wapi(infoblox_params)}/record:cname", params={ "canonical": hostname + domain_name, "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, ) cname_data = r.json() provided_cnames = [item + domain_name for item in cname_aliases] found_cnames = [item["name"] for item in cname_data if "name" in item] assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." return host_addresses