diff --git a/gso/services/infoblox.py b/gso/services/infoblox.py index 652e63f6812857b22ca7175c39c74273dfe4554f..1011ffac51e3a984fe96bdb3ad949c8f842e527d 100644 --- a/gso/services/infoblox.py +++ b/gso/services/infoblox.py @@ -1,44 +1,135 @@ import ipaddress +from logging import getLogger from infoblox_client import connector, objects +from infoblox_client.exceptions import InfobloxCannotCreateObject, InfobloxCannotUpdateObject -from gso.settings import load_oss_params +from gso.settings import IPAMParams, load_oss_params +logger = getLogger(__name__) -def _setup_connection() -> connector.Connector: - oss = load_oss_params() + +class AllocationError(Exception): + pass + + +class DeletionError(Exception): + pass + + +def _setup_connection() -> tuple[connector.Connector, IPAMParams]: + oss = load_oss_params().IPAM options = { - "host": oss.IPAM.INFOBLOX.host, - "username": oss.IPAM.INFOBLOX.username, - "password": oss.IPAM.INFOBLOX.password, - "wapi_version": oss.IPAM.INFOBLOX.wapi_version[1:], # remove the 'v' in front of the version number - "ssl_verify": True if oss.IPAM.INFOBLOX.scheme == "https" else False, + "host": oss.INFOBLOX.host, + "username": oss.INFOBLOX.username, + "password": oss.INFOBLOX.password, + "wapi_version": oss.INFOBLOX.wapi_version[1:], # remove the 'v' in front of the version number + "ssl_verify": True if oss.INFOBLOX.scheme == "https" else False, } - return connector.Connector(options) + return connector.Connector(options), oss -def allocate_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> objects.Network: - conn = _setup_connection() - return objects.Network.create(conn, str(ip_network)) +def _allocate_network( + conn: connector.Connector, + dns_view: str, + netmask: int, + containers: list[str], + comment: str, +) -> ipaddress.IPv4Network | ipaddress.IPv6Network: + for container in [ipaddress.ip_network(con) for con in containers]: + for network in container.subnets(new_prefix=netmask): + if objects.Network.search(conn, network=str(network)) is None: + created_net = objects.Network.create(conn, network=str(network), dns_view=dns_view, comment=comment) + if created_net.response != "Infoblox Object already Exists": + return ipaddress.ip_network(created_net.network) + logger.warning(f"IP container {container} appears to be full.") + + raise AllocationError(f"Cannot allocate anything in {containers}, check whether any IP space is available.") + + +def hostname_available(hostname: str) -> bool: + conn, _ = _setup_connection() + return objects.HostRecord.search(conn, name=hostname) is None + + +def allocate_v4_network(service_type: str, comment: str | None = "") -> ipaddress.IPv4Network: + conn, oss = _setup_connection() + netmask = getattr(oss, service_type).V4.mask + containers = getattr(oss, service_type).V4.containers + dns_view = getattr(oss, service_type).dns_view + + return _allocate_network(conn, dns_view, netmask, containers, comment) + + +def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddress.IPv6Network: + conn, oss = _setup_connection() + netmask = getattr(oss, service_type).V6.mask + containers = getattr(oss, service_type).V6.containers + dns_view = getattr(oss, service_type).dns_view + + return _allocate_network(conn, dns_view, netmask, containers, comment) def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None: conn = _setup_connection() network = objects.Network.search(conn, cidr=str(ip_network)) - network.delete() + if network: + network.delete() + else: + raise DeletionError(f"Could not find network {ip_network}, nothing has been deleted.") -def allocate_host(hostname: str, net_cidr: ipaddress.IPv4Network | ipaddress.IPv6Network) -> objects.HostRecord: - conn = _setup_connection() - address = objects.IPAllocation.next_available_ip_from_cidr("default", net_cidr) - ip_object = objects.IP.create(address, "00:00:00:00:00:00", configure_for_dhcp=False) - return objects.HostRecord.create(conn, name=hostname, ip=ip_object) +def allocate_host( + hostname: str, service_type: str, cname_aliases: list[str], comment: str | None = "" +) -> tuple[ipaddress.IPv4Address, ipaddress.IPv6Address]: + if not hostname_available(hostname): + raise AllocationError(f"Cannot allocate new host, FQDN {hostname} already taken.") + + conn, oss = _setup_connection() + allocation_networks_v4 = getattr(oss, service_type).V4.networks + allocation_networks_v6 = getattr(oss, service_type).V6.networks + dns_view = getattr(oss, service_type).dns_view + + created_v6 = None + for ipv6_range in allocation_networks_v6: + v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(dns_view, ipv6_range) + ipv6_object = objects.IP.create(ip=v6_alloc, mac="00:00:00:00:00:00", configure_for_dhcp=False) + try: + new_host = objects.HostRecord.create( + conn, ip=ipv6_object, name=hostname, aliases=cname_aliases, comment=comment, dns_view=dns_view + ) + created_v6 = ipaddress.IPv6Address(new_host.ipv6addr) + except InfobloxCannotCreateObject: + logger.warning(f"Cannot find 1 available IP address in network {ipv6_range}.") + + if created_v6 is None: + raise AllocationError(f"Cannot find 1 available IP address in networks {allocation_networks_v6}.") + + created_v4 = None + for ipv4_range in allocation_networks_v4: + v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(dns_view, ipv4_range) + ipv4_object = objects.IP.create(ip=v4_alloc, mac="00:00:00:00:00:00", configure_for_dhcp=False) + new_host = objects.HostRecord.search(conn, name=hostname) + new_host.ipv4addrs = [ipv4_object] + try: + new_host.update() + created_v4 = ipaddress.IPv4Address(new_host.ipv4addr) + except InfobloxCannotUpdateObject: + logger.warning(f"Cannot find 1 available IP address in network {ipv4_range}.") + + if created_v4 is None: + raise AllocationError(f"Cannot find 1 available IP address in networks {allocation_networks_v4}.") + + return created_v4, created_v6 def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None: conn = _setup_connection() host = objects.HostRecord.search(conn, ipv4addr=ip_addr) - host.delete() + if host: + host.delete() + else: + raise DeletionError(f"Could not find host at {ip_addr}, nothing has been deleted.") def delete_host_by_fqdn(fqdn: str) -> None: