import ipaddress from logging import getLogger from infoblox_client import connector, objects from infoblox_client.exceptions import InfobloxCannotCreateObject, InfobloxCannotUpdateObject from gso.settings import IPAMParams, load_oss_params logger = getLogger(__name__) class AllocationError(Exception): pass class DeletionError(Exception): pass def _setup_connection() -> tuple[connector.Connector, IPAMParams]: """Set up a new connection with an Infoblox instance. :return: A tuple that has an Infoblox ``Connector`` instance, and :term:`IPAM` parameters. :rtype: tuple[:class:`Connector`, IPAMParams] """ oss = load_oss_params().IPAM options = { "host": oss.INFOBLOX.host, "username": oss.INFOBLOX.username, "password": oss.INFOBLOX.password, "wapi_version": oss.INFOBLOX.wapi_version, "ssl_verify": True if oss.INFOBLOX.scheme == "https" else False, } return connector.Connector(options), oss def _allocate_network( conn: connector.Connector, dns_view: str, netmask: int, containers: list[str], comment: str | None = "", ) -> ipaddress.IPv4Network | ipaddress.IPv6Network: """Allocate a new network in Infoblox. The function will go over all given containers, and try to allocate a network within the available IP space. If no space is available, this method raises an :class:`AllocationError`. :param conn: An active Infoblox connection. :type conn: :class:`infoblox_client.connector.Connector` :param dns_view: The Infoblox `dns_view` in which the network should be allocated. :type dns_view: str :param netmask: The netmask of the desired network. Can be up to 32 for v4 networks, and 128 for v6 networks. :type netmask: int :param containers: A list of network containers in which the network should be allocated, given in :term:`CIDR` notation. :type containers: list[str] :param comment: Optionally, a comment can be added to the network allocation. :type comment: str, optional """ for container in [ipaddress.ip_network(con) for con in containers]: for network in container.subnets(new_prefix=netmask): if objects.Network.search(conn, network=str(network)) is None: created_net = objects.Network.create(conn, network=str(network), dns_view=dns_view, comment=comment) if created_net.response != "Infoblox Object already Exists": return ipaddress.ip_network(created_net.network) logger.warning(f"IP container {container} appears to be full.") raise AllocationError(f"Cannot allocate anything in {containers}, check whether any IP space is available.") def hostname_available(hostname: str) -> bool: """Check whether a hostname is still available **in Infoblox**. Check whether Infoblox already has a :class:`infoblox_client.objects.HostRecord` that matches the given hostname. .. warning:: This method only checks within the Infoblox instance, and not the rest of the internet. The hostname could therefore still be taken elsewhere. :param hostname: The hostname to be checked. :type hostname: str """ conn, _ = _setup_connection() return objects.HostRecord.search(conn, name=hostname) is None def allocate_v4_network(service_type: str, comment: str | None = "") -> ipaddress.IPv4Network: """Allocate a new IPv4 network in Infoblox. Allocate an IPv4 network for a specific service type. The service type should be defined in the :term:`OSS` parameters of :term:`GSO`, from which the containers and netmask will be used. :param service_type: The service type for which the network is allocated. :type service_type: str :param comment: A comment to be added to the allocated network in Infoblox. :type comment: str, optional """ conn, oss = _setup_connection() netmask = getattr(oss, service_type).V4.mask containers = getattr(oss, service_type).V4.containers dns_view = getattr(oss, service_type).dns_view return ipaddress.IPv4Network(_allocate_network(conn, dns_view, netmask, containers, comment)) def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddress.IPv6Network: """Allocate a new IPv6 network in Infoblox. Allocate an IPv6 network for a specific service type. The service type should be defined in the :term:`OSS` parameters of :term:`GSO`, from which the containers and netmask will be used. :param service_type: The service type for which the network is allocated. :type service_type: str :param comment: A comment to be added to the allocated network in Infoblox. :type comment: str, optional """ conn, oss = _setup_connection() netmask = getattr(oss, service_type).V6.mask containers = getattr(oss, service_type).V6.containers dns_view = getattr(oss, service_type).dns_view return ipaddress.IPv6Network(_allocate_network(conn, dns_view, netmask, containers, comment)) def find_network_by_cidr(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> objects.Network | None: """Find a network in Infoblox by its :term:`CIDR`. :param ip_network: The :term:`CIDR` that is searched. :type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network """ conn, _ = _setup_connection() return objects.Network.search(conn, cidr=str(ip_network)) def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None: """Delete a network in Infoblox. Delete a network that is allocated in Infoblox, by passing the :term:`CIDR` to be deleted. The :term:`CIDR` must exactly match an existing entry in Infoblox, otherwise this method raises a :class:`DeletionError` :param ip_network: The network that should get deleted. :type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network """ network = find_network_by_cidr(ip_network) if network: network.delete() else: raise DeletionError(f"Could not find network {ip_network}, nothing has been deleted.") def allocate_host( hostname: str, service_type: str, cname_aliases: list[str], comment: str ) -> tuple[ipaddress.IPv4Address, ipaddress.IPv6Address]: """Allocate a new host record in Infoblox. Create a new host record in Infoblox, by providing a hostname, and the service type that is associated with this new host. Most likely to be a loopback interface. If the hostname is not available in Infoblox (due to a potential collision) this method raises an :class:`AllocationError`. :param hostname: The :term:`FQDN` of the new host :type hostname: str :param service_type: The service type from which IP resources should be used. :type service_type: str :param cname_aliases: A list of any :term:`CNAME` aliases that should be associated with this host. Most often this will be a single loopback address. :type cname_aliases: list[str] :param comment: A comment that is added to the host record in Infoblox, should be the `subscription_id` of the new :class:`Router` subscription. :type comment: str """ if not hostname_available(hostname): raise AllocationError(f"Cannot allocate new host, FQDN {hostname} already taken.") conn, oss = _setup_connection() allocation_networks_v4 = getattr(oss, service_type).V4.networks allocation_networks_v6 = getattr(oss, service_type).V6.networks dns_view = getattr(oss, service_type).dns_view created_v6 = None for ipv6_range in allocation_networks_v6: v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(dns_view, str(ipv6_range)) ipv6_object = objects.IP.create(ip=v6_alloc, mac="00:00:00:00:00:00", configure_for_dhcp=False) try: new_host = objects.HostRecord.create( conn, ip=ipv6_object, name=hostname, aliases=cname_aliases, comment=comment, dns_view=dns_view ) created_v6 = ipaddress.IPv6Address(new_host.ipv6addr) except InfobloxCannotCreateObject: logger.warning(f"Cannot find 1 available IP address in network {ipv6_range}.") if created_v6 is None: raise AllocationError(f"Cannot find 1 available IP address in networks {allocation_networks_v6}.") created_v4 = None for ipv4_range in allocation_networks_v4: v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(dns_view, str(ipv4_range)) ipv4_object = objects.IP.create(ip=v4_alloc, mac="00:00:00:00:00:00", configure_for_dhcp=False) new_host = objects.HostRecord.search(conn, name=hostname) new_host.ipv4addrs = [ipv4_object] try: new_host.update() new_host = objects.HostRecord.search(conn, name=hostname) created_v4 = ipaddress.IPv4Address(new_host.ipv4addr) except InfobloxCannotUpdateObject: logger.warning(f"Cannot find 1 available IP address in network {ipv4_range}.") if created_v4 is None: raise AllocationError(f"Cannot find 1 available IP address in networks {allocation_networks_v4}.") return created_v4, created_v6 def find_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> objects.HostRecord | None: """Find a host record in Infoblox by its associated IP address. :param ip_addr: The IP address of a host that is searched for. :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address """ conn, _ = _setup_connection() if ip_addr.version == 4: return objects.HostRecord.search( conn, ipv4addr=ip_addr, return_fields=["ipv4addrs", "name", "view", "aliases", "comment"] ) return objects.HostRecord.search( conn, ipv6addr=ip_addr, return_fields=["ipv6addrs", "name", "view", "aliases", "comment"] ) def find_host_by_fqdn(fqdn: str) -> objects.HostRecord | None: """Find a host record by its associated :term:`FQDN`. :param fqdn: The :term:`FQDN` of a host that is searched for. :type fqdn: str """ conn, _ = _setup_connection() return objects.HostRecord.search(conn, name=fqdn, return_fields=["ipv4addrs", "name", "view", "aliases", "comment"]) def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None: """Delete a host from Infoblox. Delete a host record in Infoblox, by providing the IP address that is associated with the record. Raises a :class:`DeletionError` if no record can be found in Infoblox. :param ip_addr: The IP address of the host record that should get deleted. :type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address """ host = find_host_by_ip(ip_addr) if host: host.delete() else: raise DeletionError(f"Could not find host at {ip_addr}, nothing has been deleted.") def delete_host_by_fqdn(fqdn: str) -> None: """Delete a host from Infoblox. Delete a host record in Infoblox, by providing the :term:`FQDN` that is associated with the record. Raises a :class:`DeletionError` if no record can be found in Infoblox. :param fqdn: The :term:`FQDN` of the host record that should get deleted. :type fqdn: str """ host = find_host_by_fqdn(fqdn) if host: host.delete() else: raise DeletionError(f"Could not find host at {fqdn}, nothing has been deleted.")