Skip to content
Snippets Groups Projects
ipam.py 29.6 KiB
Newer Older
# mypy: ignore-errors
Erik Reid's avatar
Erik Reid committed
import ipaddress
from typing import Optional, Tuple, Union
from pydantic import BaseSettings
from requests.auth import HTTPBasicAuth
Erik Reid's avatar
Erik Reid committed

class V4ServiceNetwork(BaseSettings):
Erik Reid's avatar
Erik Reid committed
    v4: ipaddress.IPv4Network


class V6ServiceNetwork(BaseSettings):
Erik Reid's avatar
Erik Reid committed
    v6: ipaddress.IPv6Network


class ServiceNetworks(BaseSettings):
    v4: ipaddress.IPv4Network
    v6: ipaddress.IPv6Network
class V4HostAddress(BaseSettings):
    v4: ipaddress.IPv4Address


class V6HostAddress(BaseSettings):
    v6: ipaddress.IPv6Address


class HostAddresses(BaseSettings):
    v4: ipaddress.IPv4Address
    v6: ipaddress.IPv6Address
class IPAMErrors(Enum):
    # HTTP error code, match in error message
    CONTAINER_FULL = 400, "Can not find requested number of networks"
    NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network"


REQUESTS_TIMEOUT = 20


# TODO: remove this!
# lab infoblox cert is not valid for the ipv4 address
#   ... disable warnings for now
requests.packages.urllib3.disable_warnings()


def match_error_code(response, error_code):
    return response.status_code == error_code.value[0] and error_code.value[1] in response.text


def wapi(infoblox_params: settings.InfoBloxParams):
    return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}"


def ip_addr_version(addr: str = ""):
    ip_version = None
    ip_addr = ipaddress.ip_address(addr)
    if isinstance(ip_addr, ipaddress.IPv4Address):
        ip_version = 4
    elif isinstance(ip_addr, ipaddress.IPv6Address):
        ip_version = 6
    assert ip_version in [4, 6]
    return ip_version


def ip_network_version(network: str = ""):
    ip_version = None
    ip_network = ipaddress.ip_network(network)
    if isinstance(ip_network, ipaddress.IPv4Network):
        ip_version = 4
    elif isinstance(ip_network, ipaddress.IPv6Network):
        ip_version = 6
    assert ip_version in [4, 6]
    return ip_version


def assert_host_in_service(
    ipv4_addr: str = "",
    ipv6_addr: str = "",
    oss_ipv4_containers=None,
    oss_ipv6_containers=None,
    oss_ipv4_networks=None,
    oss_ipv6_networks=None,
):
    # IPv4
    if oss_ipv4_containers:
        assert any(
            ipv4_addr in oss_ipv4_container for oss_ipv4_container in oss_ipv4_containers
        ), "Host's IPv4 address doesn't belong to service type."
    else:
        assert any(
            ipv4_addr in oss_ipv4_network for oss_ipv4_network in oss_ipv4_networks
        ), "Host's IPv4 address doesn't belong to service type."

    # IPv6
    if oss_ipv6_containers:
        assert any(
            ipv6_addr in oss_ipv6_container for oss_ipv6_container in oss_ipv6_containers
        ), "Host's IPv6 address doesn't belong to service type."
    else:
        assert any(
            ipv6_addr in oss_ipv6_network for oss_ipv6_network in oss_ipv6_networks
        ), "Host's IPv6 address doesn't belong to service type."


def assert_network_in_service(
    ipv4_network: Optional[V4ServiceNetwork] = None,
    ipv6_network: Optional[V6ServiceNetwork] = None,
    oss_ipv4_containers=None,
    oss_ipv6_containers=None,
    oss_ipv4_networks=None,
    oss_ipv6_networks=None,
):
    # IPv4
    if ipv4_network:
        if oss_ipv4_containers:
            assert any(
                ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers
            ), "Network doesn't belong to service type."
        else:
            assert ipv4_network in oss_ipv4_networks, "Network doesn't belong to service type."

    # IPv6
    if ipv6_network:
        if oss_ipv6_containers:
            assert any(
                ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers
            ), "Network doesn't belong to service type."
        else:
            assert ipv6_network in oss_ipv6_networks, "Network doesn't belong to service type."


def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4):
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    """Get all networks optinally filtering by a container or by a network.

    Args:
    ----
    network_container (str, optional): container to filter by
    network (str, optional): network to filter by
    ip_version (int): 4 or 6

    Returns:
    -------
    (list) all found networks mathing the args, which may be empty.

    """
    assert ip_version in [4, 6]
    oss = settings.load_oss_params()
    assert oss.IPAM.INFOBLOX
    infoblox_params = oss.IPAM.INFOBLOX
    endpoint = "network" if ip_version == 4 else "ipv6network"
    params = None
    if network_container:
        params = {"network_container": network_container}
    elif network:
        params = {"network": network, "_return_fields": "comment"}
    r = requests.get(
        f"{wapi(infoblox_params)}/{endpoint}",
        params=params,
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
    )
    assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
    return r.json()


def allocate_network_inner(
    infoblox_params: settings.InfoBloxParams,
    network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams],
    ip_version: int = 4,
    comment: Optional[str] = "",
    extattrs: Optional[dict] = None,
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
    if extattrs is None:
        extattrs = {}
    assert ip_version in [4, 6]
    endpoint = "network" if ip_version == 4 else "ipv6network"
    ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer"

    assert network_params.containers, (
        "No containers available to allocate networks for this service."
        "Maybe you want to allocate a host from a network directly?"
    )

    # only return in the response the allocated network, not all available
    # TODO: any validation needed for extrattrs wherever it's used?
    req_payload = {
        "network": {
            "_object_function": "next_available_network",
            "_parameters": {"cidr": network_params.mask},
            "_object": ip_container,
            "_object_parameters": {"network": str(network_params.containers[0])},
            "_result_field": "networks",
        },
        "comment": comment,
        "extattrs": extattrs,
    }

    container_index = 0
    while True:
        r = requests.post(
            f"{wapi(infoblox_params)}/{endpoint}",
            params={"_return_fields": "network"},
            json=req_payload,
            auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
            headers={"content-type": "application/json"},
JORGE SASIAIN's avatar
JORGE SASIAIN committed
            verify=False,  # noqa: S501
            timeout=REQUESTS_TIMEOUT,
        )
        if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL):
            break
        # Container full: try with next valid container for service (if any)
        container_index += 1
        if len(network_params.containers) < (container_index + 1):
            break
        req_payload["network"]["_object_parameters"]["network"] = str(network_params.containers[container_index])

    assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"

    assert "network" in r.json()
    allocated_network = r.json()["network"]
    if ip_version == 4:
        return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network))
    return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network))
def allocate_ipv4_network(
    service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V4ServiceNetwork:
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    """Allocate IPv4 network within the container of the specified service type.

    Args:
    ----
    service_type (str): the name of the service type (e.g. "TRUNK")
    comment (str, optional): a custom comment to write in the comment field in IPAM
    extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"})

    Returns:
    -------
    (V4ServiceNetwork): the allocated network

    """
    if extattrs is None:
        extattrs = {}
    oss = settings.load_oss_params()
    assert oss.IPAM
    ipam_params = oss.IPAM
    assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."
    return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs)
def allocate_ipv6_network(
    service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V6ServiceNetwork:
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    """Allocate IPv6 network within the container of the specified service type.

    Args:
    ----
    service_type (str): the name of the service type (e.g. "TRUNK")
    comment (str, optional): a custom comment to write in the comment field in IPAM
    extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"})

    Returns:
    -------
    (V4ServiceNetwork): the allocated network

    """
    if extattrs is None:
        extattrs = {}
    oss = settings.load_oss_params()
    assert oss.IPAM
    ipam_params = oss.IPAM
    assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."
    return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs)


def allocate_networks(
    service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> ServiceNetworks:
    """Allocate IPv4 and IPv6 network for the specified service type."""
    if extattrs is None:
        extattrs = {}
    v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs)
    v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs)
    return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6)
def find_next_available_ip(infoblox_params, network_ref: str = ""):
    """Find the next available IP address from a network given its ref.
JORGE SASIAIN's avatar
JORGE SASIAIN committed

    Args:
    ----
    infoblox_params (settings.InfoBloxParams): infoblox params
    network_ref (str): the network to find the next available ip, in InfoBlox reference format

    Returns:
    -------
    (str): next available ip in the network, or "NETWORK_FULL" if there's no space in the network

        f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1",
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
    )

    if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL):
        return "NETWORK_FULL"

    assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
    assert "ips" in r.json()
    received_ip = r.json()["ips"]
    assert len(received_ip) == 1
    return received_ip[0]


def allocate_host_inner(  # noqa: C901
    hostname: str = "",
    addrs: Optional[Tuple] = None,
    networks: Optional[Tuple] = None,
    cname_aliases: Optional[list] = None,
    dns_view: Optional[str] = "default",
    extattrs: Optional[dict] = None,
) -> Union[HostAddresses, str]:
    # TODO: should hostnames be unique
    # (i.e. fail if hostname already exists in this domain/service)?
    if cname_aliases is None:
        cname_aliases = []
    if extattrs is None:
        extattrs = {}
    assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host."
    oss = settings.load_oss_params()
    assert oss.IPAM.INFOBLOX
    infoblox_params = oss.IPAM.INFOBLOX

JORGE SASIAIN's avatar
JORGE SASIAIN committed
    # If networks is not None, allocate host in those networks.
    if networks:
        ipv4_network = networks[0]
        ipv6_network = networks[1]
        assert ip_network_version(ipv4_network) == 4
        assert ip_network_version(ipv6_network) == 6

        # Find the next available IP address in each network
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        # If requested error doesn't exist, return error
        network_info = find_networks(network=ipv4_network, ip_version=4)
        if len(network_info) != 1:
            return "IPV4_NETWORK_NOT_FOUND"
        assert "_ref" in network_info[0]
        ipv4_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"])

        network_info = find_networks(network=ipv6_network, ip_version=6)
        if len(network_info) != 1:
            return "IPV6_NETWORK_NOT_FOUND"
        assert "_ref" in network_info[0]
        ipv6_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"])

        # If couldn't find next available IPs, return error
        if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL":
            if ipv4_addr == "NETWORK_FULL":
                return "IPV4_NETWORK_FULL"
            if ipv6_addr == "NETWORK_FULL":
                return "IPV6_NETWORK_FULL"

JORGE SASIAIN's avatar
JORGE SASIAIN committed
    # Otherwise if addrs is not None, allocate host with those addresses.
    else:
        ipv4_addr = addrs[0]
        ipv6_addr = addrs[1]
        assert ip_addr_version(ipv4_addr) == 4
        assert ip_addr_version(ipv6_addr) == 6

JORGE SASIAIN's avatar
JORGE SASIAIN committed
    # hostname parameter must be full name including domain name.
    req_payload = {
        "ipv4addrs": [{"ipv4addr": ipv4_addr}],
        "ipv6addrs": [{"ipv6addr": ipv6_addr}],
        "name": hostname,
        "configure_for_dns": True,
        "view": dns_view,
        "extattrs": extattrs,
    }

    r = requests.post(
        f"{wapi(infoblox_params)}/record:host",
        json=req_payload,
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
    )
    assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
    assert isinstance(r.json(), str)
    assert r.json().startswith("record:host/")

    if cname_aliases:
        cname_req_payload = {"name": "", "canonical": hostname, "view": dns_view, "extattrs": extattrs}

        for alias in cname_aliases:
            cname_req_payload["name"] = alias
            r = requests.post(
                f"{wapi(infoblox_params)}/record:cname",
                json=cname_req_payload,
                auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
                verify=False,  # noqa: S501
                timeout=REQUESTS_TIMEOUT,
            )
            assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
            assert r.json().startswith("record:cname/")

    return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr))


def allocate_host(  # noqa: C901
    hostname: str = "",
    service_type: str = "",
    service_networks: Optional[ServiceNetworks] = None,
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    host_addresses: Optional[HostAddresses] = None,
    cname_aliases: Optional[list] = None,
    extattrs: Optional[dict] = None,
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    """Allocate host record with both IPv4 and IPv6 address, and respective DNS A and AAAA records.

    Args:
    ----
    hostname (str): hostname of the host (without domain name, which is taken from the service type)
    service_type (str): the name of the service type (e.g. "TRUNK")
    service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence)
    cname_aliases (list, optional): to create cname records in addition to the host record
    extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"})

    Returns:
    -------
    (HostAddresses): ipv4 and ipv6 addresses of the allocated host

    if cname_aliases is None:
        cname_aliases = []
    if extattrs is None:
        extattrs = {}
    oss = settings.load_oss_params()
    assert oss.IPAM
    ipam_params = oss.IPAM

    assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."
    oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers
    oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers
    oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks
    oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks
    domain_name = getattr(ipam_params, service_type).domain_name
    dns_view = getattr(ipam_params, service_type).dns_view

    assert (oss_ipv4_containers and oss_ipv6_containers) or (
        oss_ipv4_networks and oss_ipv6_networks
    ), "This service is missing either containers or networks configuration."
    assert domain_name, "This service is missing domain_name configuration."
    assert dns_view, "This service is missing dns_view configuration."

    if cname_aliases:
        cname_aliases = [alias + domain_name for alias in cname_aliases]

JORGE SASIAIN's avatar
JORGE SASIAIN committed
    # When neither service_networks not host_addresses are provided:
    # If service has configured containers, new ipv4 and ipv6 networks are created and those are used.
    # Note that in this case extattrs is for the hosts and not for the networks.
    # If service doesn't have configured containers and has configured networks instead, the configured
    # networks are used (they are filled up in order of appearance in the configuration file).
    if not service_networks and not host_addresses:
        if oss_ipv4_containers and oss_ipv6_containers:
            # This service has configured containers.
            # Use them to allocate new networks that can allocate the hosts.

            # IPv4
            ipv4_network = str(allocate_ipv4_network(service_type=service_type).v4)
            assert ipv4_network, "No available space for IPv4 networks for this service type."

            # IPv6
            ipv6_network = str(allocate_ipv6_network(service_type=service_type).v6)
            assert ipv6_network, "No available space for IPv6 networks for this service type."

        elif oss_ipv4_networks and oss_ipv6_networks:
            # This service has configured networks.
            # Allocate a host inside an ipv4 and ipv6 network from among them.
            ipv4_network = str(oss_ipv4_networks[0])
            ipv6_network = str(oss_ipv6_networks[0])

        ipv4_network_index = 0
        ipv6_network_index = 0
        while True:
            network_tuple = (ipv4_network, ipv6_network)
            host = allocate_host_inner(
                hostname=hostname + domain_name,
                networks=network_tuple,
                cname_aliases=cname_aliases,
                dns_view=dns_view,
                extattrs=extattrs,
            )

            if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host:
                break
            if "IPV4" in host:
                ipv4_network_index += 1
                assert oss_ipv4_networks, "No available space in any IPv4 network for this service."
                assert ipv4_network_index < len(
                    oss_ipv4_networks
                ), "No available space in any IPv4 network for this service."
                ipv4_network = str(oss_ipv4_networks[ipv4_network_index])
            else:  # "IPV6" in host
                ipv6_network_index += 1
                assert oss_ipv6_networks, "No available space in any IPv6 network for this service."
                assert ipv6_network_index < len(
                    oss_ipv6_networks
                ), "No available space in any IPv6 network for this service."
                ipv6_network = str(oss_ipv6_networks[ipv6_network_index])

    elif service_networks:
        ipv4_network = service_networks.v4
        ipv6_network = service_networks.v6
        assert_network_in_service(
            ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks
        )
        host = allocate_host_inner(
            hostname=hostname + domain_name,
            networks=(str(ipv4_network), str(ipv6_network)),
            cname_aliases=cname_aliases,
            dns_view=dns_view,
            extattrs=extattrs,
        )
        assert "NETWORK_FULL" not in host, "Network is full."
        assert "NETWORK_NOT_FOUND" not in host, "Network does not exist in IPAM. Create it first."

    elif host_addresses:
        ipv4_addr = host_addresses.v4
        ipv6_addr = host_addresses.v6
        assert_host_in_service(
            ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks
        )

        host = allocate_host_inner(
            hostname=hostname + domain_name,
            addrs=(str(ipv4_addr), str(ipv6_addr)),
            cname_aliases=cname_aliases,
            dns_view=dns_view,
            extattrs=extattrs,
        )
        assert "NETWORK_FULL" not in host

    return host


def delete_network(
    network: ipaddress.ip_network = None, service_type: str = ""
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
    """Delete IPv4 or IPv6 network by CIDR."""
    oss = settings.load_oss_params()
    assert oss.IPAM
    ipam_params = oss.IPAM
    assert ipam_params.INFOBLOX
    infoblox_params = ipam_params.INFOBLOX

    assert network, "No network specified to delete."
    assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."

    ip_version = ip_network_version(str(network))

    # Ensure that the network to be deleted is under the service type.
    # Otherwise user is not allowed to delete it
    oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers
    oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers
    oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks
    oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks
    ipv4_network = None
    ipv6_network = None
        ipv6_network = network
    assert_network_in_service(
        ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks
    )

    network_info = find_networks(network=str(network), ip_version=ip_version)
    assert len(network_info) == 1, "Network to delete does not exist in IPAM."
    assert "_ref" in network_info[0], "Network to delete does not exist in IPAM."

    r = requests.delete(
        f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}',
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
    )
    assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"

    # Extract ipv4/ipv6 address from the network reference obtained in the
    # response
    r_text = r.text
    network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":"))
    if ip_version == 4:
        return V4ServiceNetwork(v4=ipaddress.ip_network(network_address))
    return V6ServiceNetwork(v6=ipaddress.ip_network(network_address))
def delete_host(
    hostname: str = "",
    host_addresses: HostAddresses = None,
    cname_aliases: Optional[list] = None,
    service_type: str = "",
JORGE SASIAIN's avatar
JORGE SASIAIN committed
) -> HostAddresses:
    """Delete host record and associated CNAME records.
JORGE SASIAIN's avatar
JORGE SASIAIN committed

    All arguments passed to this function must match together a host record in
    IPAM, and all CNAME records associated to it must also be passed exactly.
    """
    if cname_aliases is None:
        cname_aliases = []
    oss = settings.load_oss_params()
    assert oss.IPAM
    ipam_params = oss.IPAM
    assert ipam_params.INFOBLOX
    infoblox_params = ipam_params.INFOBLOX

    assert host_addresses, "No host specified to delete."
    assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."
    oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers
    oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers
    oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks
    oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks
    domain_name = getattr(ipam_params, service_type).domain_name
    dns_view = getattr(ipam_params, service_type).dns_view
    ipv4_addr = str(host_addresses.v4)
    ipv6_addr = str(host_addresses.v6)

    assert_host_in_service(
        host_addresses.v4,
        host_addresses.v6,
        oss_ipv4_containers,
        oss_ipv6_containers,
        oss_ipv4_networks,
        oss_ipv6_networks,
    )

    # Find host record reference
    r = requests.get(
        f"{wapi(infoblox_params)}/record:host",
        params={
            "name": (hostname + domain_name).lower(),  # hostnames are lowercase
            "ipv4addr": ipv4_addr,
            "ipv6addr": ipv6_addr,
            "view": dns_view,
        },
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
    assert len(host_data) == 1, "Host to delete does not exist in IPAM."
    assert "_ref" in host_data[0], "Host to delete does not exist in IPAM."
    host_ref = host_data[0]["_ref"]

    # Find cname records reference
    r = requests.get(
        f"{wapi(infoblox_params)}/record:cname",
        params={
            "canonical": hostname + domain_name,
            "view": dns_view,
        },
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
    )
    cname_data = r.json()
    provided_cnames = [item + domain_name for item in cname_aliases]
    found_cnames = [item["name"] for item in cname_data if "name" in item]
    assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname."

    # Delete the host record
    r = requests.delete(
        f"{wapi(infoblox_params)}/{host_ref}",
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
    )
    assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"

    # Delete the CNAME records
    cname_refs = [item["_ref"] for item in cname_data if "name" in item]
    for cname_ref in cname_refs:
        r = requests.delete(
            f"{wapi(infoblox_params)}/{cname_ref}",
            auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
            verify=False,  # noqa: S501
            timeout=REQUESTS_TIMEOUT,
        )
        assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"

    return host_addresses
    gso_subscription_id: str = "", network: ipaddress.ip_network = None
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
    """Validate IPv4 or IPv6 network.
JORGE SASIAIN's avatar
JORGE SASIAIN committed

    Check if the specified network exist, and, if it does, check if its comment field contains gso_subscription_id.
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    Returns the network if validation successful.
    """
    assert network, "No network specified to validate."

    ip_version = ip_network_version(str(network))
    network_info = find_networks(network=str(network), ip_version=ip_version)
    assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM."
    assert "comment" in network_info[0], "Network to validate does not have comment in IPAM."
    assert (
        gso_subscription_id in network_info[0]["comment"]
    ), "GSO subscription ID does not match the one in the comment field of the IPAM network."
    if ip_version == 4:
        return V4ServiceNetwork(v4=network)
    return V6ServiceNetwork(v6=network)
JORGE SASIAIN's avatar
JORGE SASIAIN committed


def validate_host(
    hostname: str = "",
    host_addresses: HostAddresses = None,
    cname_aliases: Optional[list] = None,
    service_type: str = "",
JORGE SASIAIN's avatar
JORGE SASIAIN committed
) -> HostAddresses:
    """Validate host.
JORGE SASIAIN's avatar
JORGE SASIAIN committed

JORGE SASIAIN's avatar
JORGE SASIAIN committed
    Check if all arguments passed to this function match together a host record in
    IPAM, and all CNAME records associated to it also match exactly.
    Returns the host if validation successful.
    """
    if cname_aliases is None:
        cname_aliases = []
    oss = settings.load_oss_params()
    assert oss.IPAM
    ipam_params = oss.IPAM
    assert ipam_params.INFOBLOX
    infoblox_params = ipam_params.INFOBLOX

    assert hostname and host_addresses, "No host specified to validate. Either hostname or host_addresses missing."
    domain_name = getattr(ipam_params, service_type).domain_name
    ipv4_addr = str(host_addresses.v4)
    ipv6_addr = str(host_addresses.v6)
    dns_view = getattr(ipam_params, service_type).dns_view

    # Find host record reference
    r = requests.get(
        f"{wapi(infoblox_params)}/record:host",
        params={
            "name": (hostname + domain_name).lower(),  # hostnames are lowercase
            "ipv4addr": ipv4_addr,
            "ipv6addr": ipv6_addr,
            "view": dns_view,
        },
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    )
    host_data = r.json()
    assert len(host_data) == 1, "Host to validate does not exist in IPAM."
    assert "_ref" in host_data[0], "Host to validate does not exist in IPAM."

    # Find cname records reference
    r = requests.get(
        f"{wapi(infoblox_params)}/record:cname",
        params={
            "canonical": hostname + domain_name,
            "view": dns_view,
        },
        auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
JORGE SASIAIN's avatar
JORGE SASIAIN committed
        verify=False,  # noqa: S501
        timeout=REQUESTS_TIMEOUT,
JORGE SASIAIN's avatar
JORGE SASIAIN committed
    )
    cname_data = r.json()
    provided_cnames = [item + domain_name for item in cname_aliases]
    found_cnames = [item["name"] for item in cname_data if "name" in item]
    assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname."

    return host_addresses