"""The Infoblox service that allocates :term:`IPAM` resources used in :term:`GSO` products.""" import ipaddress from logging import getLogger from infoblox_client import connector, objects from infoblox_client.exceptions import ( InfobloxCannotCreateObject, InfobloxCannotUpdateObject, ) from gso.settings import IPAMParams, load_oss_params logger = getLogger(__name__) NULL_MAC = "00:00:00:00:00:00" class AllocationError(Exception): """Raised when Infoblox failed to allocate a resource.""" class DeletionError(Exception): """Raised when Infoblox failed to delete a resource.""" def _setup_connection() -> tuple[connector.Connector, IPAMParams]: """Set up a new connection with an Infoblox instance. :return: A tuple that has an Infoblox ``Connector`` instance, and :term:`IPAM` parameters. :rtype: tuple[:class:`Connector`, IPAMParams] """ oss = load_oss_params().IPAM options = { "host": oss.INFOBLOX.host, "username": oss.INFOBLOX.username, "password": oss.INFOBLOX.password, "wapi_version": oss.INFOBLOX.wapi_version, "ssl_verify": oss.INFOBLOX.scheme == "https", } return connector.Connector(options), oss def _allocate_network( conn: connector.Connector, dns_view: str, network_view: str, netmask: int, containers: list[str], comment: str | None = "", ) -> ipaddress.IPv4Network | ipaddress.IPv6Network: """Allocate a new network in Infoblox. The function will go over all given containers, and try to allocate a network within the available IP space. If no space is available, this method raises an :class:`AllocationError`. :param :class:`infoblox_client.connector.Connector` conn: An active Infoblox connection. :param str dns_view: The Infoblox ``dns_view`` in which the network should be allocated. :param str network_view: The Infoblox ``network_view`` where the network should be allocated. :param int netmask: The netmask of the desired network. Can be up to 32 for v4 networks, and 128 for v6 networks. :param list [str] containers: A list of network containers in which the network should be allocated, given in :term:`CIDR` notation. :param str comment: Optionally, a comment can be added to the network allocation. """ for container in [ipaddress.ip_network(con) for con in containers]: for network in container.subnets(new_prefix=netmask): if objects.Network.search(conn, network=str(network)) is None: created_net = objects.Network.create( conn, network=str(network), view=dns_view, network_view=network_view, comment=comment ) if created_net.response != "Infoblox Object already Exists": return ipaddress.ip_network(created_net.network) msg = f"IP container {container} appears to be full." logger.warning(msg) msg = f"Cannot allocate anything in {containers}, check whether any IP space is available." raise AllocationError(msg) def hostname_available(hostname: str) -> bool: """Check whether a hostname is still available **in Infoblox**. Check whether Infoblox already has a :class:`infoblox_client.objects.HostRecord` that matches the given hostname. .. warning:: This method only checks within the Infoblox instance, and not the rest of the internet. The hostname could therefore still be taken elsewhere. :param hostname: The hostname to be checked. :type hostname: str """ conn, _ = _setup_connection() return objects.HostRecord.search(conn, name=hostname) is None def allocate_v4_network(service_type: str, comment: str | None = "") -> ipaddress.IPv4Network: """Allocate a new IPv4 network in Infoblox. Allocate an IPv4 network for a specific service type. The service type should be defined in the :term:`OSS` parameters of :term:`GSO`, from which the containers and netmask will be used. :param service_type: The service type for which the network is allocated. :type service_type: str :param comment: A comment to be added to the allocated network in Infoblox. :type comment: str, optional """ conn, oss = _setup_connection() netmask = getattr(oss, service_type).V4.mask containers = getattr(oss, service_type).V4.containers dns_view = getattr(oss, service_type).dns_view network_view = getattr(oss, service_type).network_view return ipaddress.IPv4Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment)) def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddress.IPv6Network: """Allocate a new IPv6 network in Infoblox. Allocate an IPv6 network for a specific service type. The service type should be defined in the :term:`OSS` parameters of :term:`GSO`, from which the containers and netmask will be used. :param service_type: The service type for which the network is allocated. :type service_type: str :param comment: A comment to be added to the allocated network in Infoblox. :type comment: str, optional """ conn, oss = _setup_connection() netmask = getattr(oss, service_type).V6.mask containers = getattr(oss, service_type).V6.containers dns_view = getattr(oss, service_type).dns_view network_view = getattr(oss, service_type).network_view return ipaddress.IPv6Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment)) def find_network_by_cidr( ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network, ) -> objects.Network | None: """Find a network in Infoblox by its :term:`CIDR`. :param ip_network: The :term:`CIDR` that is searched. :type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network """ conn, _ = _setup_connection() return objects.Network.search(conn, cidr=str(ip_network)) def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None: """Delete a network in Infoblox. Delete a network that is allocated in Infoblox, by passing the :term:`CIDR` to be deleted. The :term:`CIDR` must exactly match an existing entry in Infoblox, otherwise this method raises a :class:`DeletionError` :param ip_network: The network that should get deleted. :type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network """ network = find_network_by_cidr(ip_network) if network: network.delete() else: msg = f"Could not find network {ip_network}, nothing has been deleted." raise DeletionError(msg) def allocate_host( hostname: str, service_type: str, cname_aliases: list[str], comment: str ) -> tuple[ipaddress.IPv4Address, ipaddress.IPv6Address]: """Allocate a new host record in Infoblox. Create a new host record in Infoblox, by providing a hostname, and the service type that is associated with this new host. Most likely to be a loopback interface. If the hostname is not available in Infoblox (due to a potential collision) this method raises an :class:`AllocationError`. :param hostname: The :term:`FQDN` of the new host :type hostname: str :param service_type: The service type from which IP resources should be used. :type service_type: str :param cname_aliases: A list of any :term:`CNAME` aliases that should be associated with this host. Most often this will be a single loopback address. :type cname_aliases: list[str] :param comment: A comment that is added to the host record in Infoblox, should be the ``subscription_id`` of the new :class:`Router` subscription. :type comment: str """ if not hostname_available(hostname): msg = f"Cannot allocate new host, FQDN {hostname} already taken." raise AllocationError(msg) conn, oss = _setup_connection() allocation_networks_v4 = getattr(oss, service_type).V4.networks allocation_networks_v6 = getattr(oss, service_type).V6.networks dns_view = getattr(oss, service_type).dns_view network_view = getattr(oss, service_type).network_view created_v6 = None for ipv6_range in allocation_networks_v6: v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv6_range)) ipv6_object = objects.IP.create(ip=v6_alloc, mac=NULL_MAC, configure_for_dhcp=False) try: new_host = objects.HostRecord.create( conn, ip=ipv6_object, name=hostname, aliases=cname_aliases, comment=comment, view=dns_view, network_view=network_view, ) created_v6 = ipaddress.IPv6Address(new_host.ipv6addr) except InfobloxCannotCreateObject: msg = f"Cannot find 1 available IP address in network {ipv6_range}." logger.warning(msg) if created_v6 is None: msg = f"Cannot find 1 available IP address in networks {allocation_networks_v6}." raise AllocationError(msg) created_v4 = None for ipv4_range in allocation_networks_v4: v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv4_range)) ipv4_object = objects.IP.create(ip=v4_alloc, mac=NULL_MAC, configure_for_dhcp=False) new_host = objects.HostRecord.search(conn, name=hostname) new_host.ipv4addrs = [ipv4_object] try: new_host.update() new_host = objects.HostRecord.search(conn, name=hostname) created_v4 = ipaddress.IPv4Address(new_host.ipv4addr) except InfobloxCannotUpdateObject: msg = f"Cannot find 1 available IP address in network {ipv4_range}." logger.warning(msg) if created_v4 is None: msg = f"Cannot find 1 available IP address in networks {allocation_networks_v4}." raise AllocationError(msg) return created_v4, created_v6 def create_host_by_ip( hostname: str, ipv4_address: ipaddress.IPv4Address, ipv6_address: ipaddress.IPv6Address, service_type: str, comment: str, ) -> None: """Create a new host record with a given IPv4 and IPv6 address. :param str hostname: The :term:`FQDN` of the new host. :param IPv4Address ipv4_address: The IPv4 address of the new host. :param IPv6Address ipv6_address: The IPv6 address of the new host. :param str service_type: The relevant service type, used to deduce the correct ``dns_view`` and ``network_view`` in Infoblox. :param str comment: The comment stored in this Infoblox record, most likely the relevant ``subscription_id`` in :term:`GSO`. """ if not hostname_available(hostname): msg = f"Cannot allocate new host, FQDN {hostname} already taken." raise AllocationError(msg) conn, oss = _setup_connection() ipv6_object = objects.IP.create(ip=str(ipv6_address), mac=NULL_MAC, configure_for_dhcp=False) ipv4_object = objects.IP.create(ip=str(ipv4_address), mac=NULL_MAC, configure_for_dhcp=False) dns_view = getattr(oss, service_type).dns_view network_view = getattr(oss, service_type).network_view # This needs to be done in two steps, otherwise only one of the IP addresses is stored. objects.HostRecord.create( conn, ip=ipv6_object, name=hostname, comment=comment, view=dns_view, network_view=network_view ) new_host = find_host_by_fqdn(hostname) new_host.ipv4addrs = [ipv4_object] new_host.update() def find_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> objects.HostRecord | None: """Find a host record in Infoblox by its associated IP address. :param ip_addr: The IP address of a host that is searched for. :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address """ conn, _ = _setup_connection() if ip_addr.version == 4: # noqa: PLR2004, the 4 in IPv4 is well-known and not a "magic value." return objects.HostRecord.search( conn, ipv4addr=ip_addr, return_fields=["ipv4addrs", "name", "view", "aliases", "comment"], ) return objects.HostRecordV6.search( conn, ipv6addr=ip_addr, return_fields=["ipv6addrs", "name", "view", "aliases", "comment"], ) def find_host_by_fqdn(fqdn: str) -> objects.HostRecord: """Find a host record by its associated :term:`FQDN`. :param fqdn: The :term:`FQDN` of a host that is searched for. :type fqdn: str """ conn, _ = _setup_connection() return objects.HostRecord.search( conn, name=fqdn, return_fields=["ipv4addrs", "name", "view", "aliases", "comment"], ) def find_v6_host_by_fqdn(fqdn: str) -> objects.HostRecordV6: """Find a host record by its associated :term:`FQDN`. This specific method will return the IPv6 variant of a record, if it exists. :param str fqdn: The :term:`FQDN` of a host that is searched for. """ conn, _ = _setup_connection() return objects.HostRecordV6.search( conn, name=fqdn, return_fields=["ipv6addrs", "name", "view", "aliases", "comment"] ) def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None: """Delete a host from Infoblox. Delete a host record in Infoblox, by providing the IP address that is associated with the record. Raises a :class:`DeletionError` if no record can be found in Infoblox. :param ip_addr: The IP address of the host record that should get deleted. :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address """ host = find_host_by_ip(ip_addr) if host: host.delete() else: msg = f"Could not find host at {ip_addr}, nothing has been deleted." raise DeletionError(msg) def delete_host_by_fqdn(fqdn: str) -> None: """Delete a host from Infoblox. Delete a host record in Infoblox, by providing the :term:`FQDN` that is associated with the record. Raises a :class:`DeletionError` if no record can be found in Infoblox. :param fqdn: The :term:`FQDN` of the host record that should get deleted. :type fqdn: str """ host = find_host_by_fqdn(fqdn) if host: host.delete() else: msg = f"Could not find host at {fqdn}, nothing has been deleted." raise DeletionError(msg)