Skip to content
Snippets Groups Projects
infoblox.py 14.14 KiB
"""The Infoblox service that allocates :term:`IPAM` resources used in :term:`GSO` products."""

import ipaddress
from logging import getLogger

from infoblox_client import connector, objects
from infoblox_client.exceptions import (
    InfobloxCannotCreateObject,
    InfobloxCannotUpdateObject,
)

from gso.settings import IPAMParams, load_oss_params

logger = getLogger(__name__)
NULL_MAC = "00:00:00:00:00:00"


class AllocationError(Exception):
    """Raised when Infoblox failed to allocate a resource."""


class DeletionError(Exception):
    """Raised when Infoblox failed to delete a resource."""


def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
    """Set up a new connection with an Infoblox instance.

    :return: A tuple that has an Infoblox ``Connector`` instance, and :term:`IPAM` parameters.
    :rtype: tuple[:class:`Connector`, IPAMParams]
    """
    oss = load_oss_params().IPAM
    options = {
        "host": oss.INFOBLOX.host,
        "username": oss.INFOBLOX.username,
        "password": oss.INFOBLOX.password,
        "wapi_version": oss.INFOBLOX.wapi_version,
        "ssl_verify": oss.INFOBLOX.scheme == "https",
    }
    return connector.Connector(options), oss


def _allocate_network(
    conn: connector.Connector,
    dns_view: str,
    network_view: str,
    netmask: int,
    containers: list[str],
    comment: str | None = "",
) -> ipaddress.IPv4Network | ipaddress.IPv6Network:
    """Allocate a new network in Infoblox.

    The function will go over all given containers, and try to allocate a network within the available IP space. If no
    space is available, this method raises an :class:`AllocationError`.

    :param :class:`infoblox_client.connector.Connector` conn: An active Infoblox connection.
    :param str dns_view: The Infoblox ``dns_view`` in which the network should be allocated.
    :param str network_view: The Infoblox ``network_view`` where the network should be allocated.
    :param int netmask: The netmask of the desired network. Can be up to 32 for v4 networks, and 128 for v6 networks.
    :param list [str] containers: A list of network containers in which the network should be allocated, given in
                                  :term:`CIDR` notation.
    :param str comment: Optionally, a comment can be added to the network allocation.
    """
    for container in [ipaddress.ip_network(con) for con in containers]:
        for network in container.subnets(new_prefix=netmask):
            if objects.Network.search(conn, network=str(network)) is None:
                created_net = objects.Network.create(
                    conn, network=str(network), view=dns_view, network_view=network_view, comment=comment
                )
                if created_net.response != "Infoblox Object already Exists":
                    return ipaddress.ip_network(created_net.network)
        msg = f"IP container {container} appears to be full."
        logger.warning(msg)

    msg = f"Cannot allocate anything in {containers}, check whether any IP space is available."
    raise AllocationError(msg)


def hostname_available(hostname: str) -> bool:
    """Check whether a hostname is still available **in Infoblox**.

    Check whether Infoblox already has a :class:`infoblox_client.objects.HostRecord` that matches the given hostname.

    .. warning::
       This method only checks within the Infoblox instance, and not the rest of the internet. The hostname could
       therefore still be taken elsewhere.

    :param hostname: The hostname to be checked.
    :type hostname: str
    """
    conn, _ = _setup_connection()
    return objects.HostRecord.search(conn, name=hostname) is None


def allocate_v4_network(service_type: str, comment: str | None = "") -> ipaddress.IPv4Network:
    """Allocate a new IPv4 network in Infoblox.

    Allocate an IPv4 network for a specific service type. The service type should be defined in the :term:`OSS`
    parameters of :term:`GSO`, from which the containers and netmask will be used.

    :param service_type: The service type for which the network is allocated.
    :type service_type: str
    :param comment: A comment to be added to the allocated network in Infoblox.
    :type comment: str, optional
    """
    conn, oss = _setup_connection()
    netmask = getattr(oss, service_type).V4.mask
    containers = getattr(oss, service_type).V4.containers
    dns_view = getattr(oss, service_type).dns_view
    network_view = getattr(oss, service_type).network_view

    return ipaddress.IPv4Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment))


def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddress.IPv6Network:
    """Allocate a new IPv6 network in Infoblox.

    Allocate an IPv6 network for a specific service type. The service type should be defined in the :term:`OSS`
    parameters of :term:`GSO`, from which the containers and netmask will be used.

    :param service_type: The service type for which the network is allocated.
    :type service_type: str
    :param comment: A comment to be added to the allocated network in Infoblox.
    :type comment: str, optional
    """
    conn, oss = _setup_connection()
    netmask = getattr(oss, service_type).V6.mask
    containers = getattr(oss, service_type).V6.containers
    dns_view = getattr(oss, service_type).dns_view
    network_view = getattr(oss, service_type).network_view

    return ipaddress.IPv6Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment))


def find_network_by_cidr(
    ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network,
) -> objects.Network | None:
    """Find a network in Infoblox by its :term:`CIDR`.

    :param ip_network: The :term:`CIDR` that is searched.
    :type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network
    """
    conn, _ = _setup_connection()
    return objects.Network.search(conn, cidr=str(ip_network))


def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None:
    """Delete a network in Infoblox.

    Delete a network that is allocated in Infoblox, by passing the :term:`CIDR` to be deleted. The :term:`CIDR` must
    exactly match an existing entry in Infoblox, otherwise this method raises a :class:`DeletionError`

    :param ip_network: The network that should get deleted.
    :type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network
    """
    network = find_network_by_cidr(ip_network)
    if network:
        network.delete()
    else:
        msg = f"Could not find network {ip_network}, nothing has been deleted."
        raise DeletionError(msg)


def allocate_host(
    hostname: str, service_type: str, cname_aliases: list[str], comment: str
) -> tuple[ipaddress.IPv4Address, ipaddress.IPv6Address]:
    """Allocate a new host record in Infoblox.

    Create a new host record in Infoblox, by providing a hostname, and the service type that is associated with this new
    host. Most likely to be a loopback interface. If the hostname is not available in Infoblox (due to a potential
    collision) this method raises an :class:`AllocationError`.

    :param hostname: The :term:`FQDN` of the new host
    :type hostname: str
    :param service_type: The service type from which IP resources should be used.
    :type service_type: str
    :param cname_aliases: A list of any :term:`CNAME` aliases that should be associated with this host. Most often this
                          will be a single loopback address.
    :type cname_aliases: list[str]
    :param comment: A comment that is added to the host record in Infoblox, should be the ``subscription_id`` of the new
                    :class:`Router` subscription.
    :type comment: str
    """
    if not hostname_available(hostname):
        msg = f"Cannot allocate new host, FQDN {hostname} already taken."
        raise AllocationError(msg)

    conn, oss = _setup_connection()
    allocation_networks_v4 = getattr(oss, service_type).V4.networks
    allocation_networks_v6 = getattr(oss, service_type).V6.networks
    dns_view = getattr(oss, service_type).dns_view
    network_view = getattr(oss, service_type).network_view

    created_v6 = None
    for ipv6_range in allocation_networks_v6:
        v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv6_range))
        ipv6_object = objects.IP.create(ip=v6_alloc, mac=NULL_MAC, configure_for_dhcp=False)
        try:
            new_host = objects.HostRecord.create(
                conn,
                ip=ipv6_object,
                name=hostname,
                aliases=cname_aliases,
                comment=comment,
                view=dns_view,
                network_view=network_view,
            )
            created_v6 = ipaddress.IPv6Address(new_host.ipv6addr)
        except InfobloxCannotCreateObject:
            msg = f"Cannot find 1 available IP address in network {ipv6_range}."
            logger.warning(msg)

    if created_v6 is None:
        msg = f"Cannot find 1 available IP address in networks {allocation_networks_v6}."
        raise AllocationError(msg)

    created_v4 = None
    for ipv4_range in allocation_networks_v4:
        v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv4_range))
        ipv4_object = objects.IP.create(ip=v4_alloc, mac=NULL_MAC, configure_for_dhcp=False)
        new_host = objects.HostRecord.search(conn, name=hostname)
        new_host.ipv4addrs = [ipv4_object]
        try:
            new_host.update()
            new_host = objects.HostRecord.search(conn, name=hostname)
            created_v4 = ipaddress.IPv4Address(new_host.ipv4addr)
        except InfobloxCannotUpdateObject:
            msg = f"Cannot find 1 available IP address in network {ipv4_range}."
            logger.warning(msg)

    if created_v4 is None:
        msg = f"Cannot find 1 available IP address in networks {allocation_networks_v4}."
        raise AllocationError(msg)

    return created_v4, created_v6


def create_host_by_ip(
    hostname: str,
    ipv4_address: ipaddress.IPv4Address,
    ipv6_address: ipaddress.IPv6Address,
    service_type: str,
    comment: str,
) -> None:
    """Create a new host record with a given IPv4 and IPv6 address.

    :param str hostname: The :term:`FQDN` of the new host.
    :param IPv4Address ipv4_address: The IPv4 address of the new host.
    :param IPv6Address ipv6_address: The IPv6 address of the new host.
    :param str service_type: The relevant service type, used to deduce the correct ``dns_view`` and ``network_view`` in
                             Infoblox.
    :param str comment: The comment stored in this Infoblox record, most likely the relevant ``subscription_id`` in
                        :term:`GSO`.
    """
    if not hostname_available(hostname):
        msg = f"Cannot allocate new host, FQDN {hostname} already taken."
        raise AllocationError(msg)

    conn, oss = _setup_connection()
    ipv6_object = objects.IP.create(ip=str(ipv6_address), mac=NULL_MAC, configure_for_dhcp=False)
    ipv4_object = objects.IP.create(ip=str(ipv4_address), mac=NULL_MAC, configure_for_dhcp=False)
    dns_view = getattr(oss, service_type).dns_view
    network_view = getattr(oss, service_type).network_view

    # This needs to be done in two steps, otherwise only one of the IP addresses is stored.
    objects.HostRecord.create(
        conn, ip=ipv6_object, name=hostname, comment=comment, view=dns_view, network_view=network_view
    )
    new_host = find_host_by_fqdn(hostname)
    new_host.ipv4addrs = [ipv4_object]
    new_host.update()


def find_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> objects.HostRecord | None:
    """Find a host record in Infoblox by its associated IP address.

    :param ip_addr: The IP address of a host that is searched for.
    :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address
    """
    conn, _ = _setup_connection()
    if ip_addr.version == 4:  # noqa: PLR2004, the 4 in IPv4 is well-known and not a "magic value."
        return objects.HostRecord.search(
            conn,
            ipv4addr=ip_addr,
            return_fields=["ipv4addrs", "name", "view", "aliases", "comment"],
        )
    return objects.HostRecordV6.search(
        conn,
        ipv6addr=ip_addr,
        return_fields=["ipv6addrs", "name", "view", "aliases", "comment"],
    )


def find_host_by_fqdn(fqdn: str) -> objects.HostRecord:
    """Find a host record by its associated :term:`FQDN`.

    :param fqdn: The :term:`FQDN` of a host that is searched for.
    :type fqdn: str
    """
    conn, _ = _setup_connection()
    return objects.HostRecord.search(
        conn,
        name=fqdn,
        return_fields=["ipv4addrs", "name", "view", "aliases", "comment"],
    )


def find_v6_host_by_fqdn(fqdn: str) -> objects.HostRecordV6:
    """Find a host record by its associated :term:`FQDN`.

    This specific method will return the IPv6 variant of a record, if it exists.
    :param str fqdn: The :term:`FQDN` of a host that is searched for.
    """
    conn, _ = _setup_connection()
    return objects.HostRecordV6.search(
        conn, name=fqdn, return_fields=["ipv6addrs", "name", "view", "aliases", "comment"]
    )


def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None:
    """Delete a host from Infoblox.

    Delete a host record in Infoblox, by providing the IP address that is associated with the record. Raises a
    :class:`DeletionError` if no record can be found in Infoblox.

    :param ip_addr: The IP address of the host record that should get deleted.
    :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address
    """
    host = find_host_by_ip(ip_addr)
    if host:
        host.delete()
    else:
        msg = f"Could not find host at {ip_addr}, nothing has been deleted."
        raise DeletionError(msg)


def delete_host_by_fqdn(fqdn: str) -> None:
    """Delete a host from Infoblox.

    Delete a host record in Infoblox, by providing the :term:`FQDN` that is associated with the record. Raises a
    :class:`DeletionError` if no record can be found in Infoblox.

    :param fqdn: The :term:`FQDN` of the host record that should get deleted.
    :type fqdn: str
    """
    host = find_host_by_fqdn(fqdn)
    if host:
        host.delete()
    else:
        msg = f"Could not find host at {fqdn}, nothing has been deleted."
        raise DeletionError(msg)