-
Karel van Klink authoredKarel van Klink authored
infoblox.py 14.14 KiB
"""The Infoblox service that allocates :term:`IPAM` resources used in :term:`GSO` products."""
import ipaddress
from logging import getLogger
from infoblox_client import connector, objects
from infoblox_client.exceptions import (
InfobloxCannotCreateObject,
InfobloxCannotUpdateObject,
)
from gso.settings import IPAMParams, load_oss_params
logger = getLogger(__name__)
NULL_MAC = "00:00:00:00:00:00"
class AllocationError(Exception):
"""Raised when Infoblox failed to allocate a resource."""
class DeletionError(Exception):
"""Raised when Infoblox failed to delete a resource."""
def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
"""Set up a new connection with an Infoblox instance.
:return: A tuple that has an Infoblox ``Connector`` instance, and :term:`IPAM` parameters.
:rtype: tuple[:class:`Connector`, IPAMParams]
"""
oss = load_oss_params().IPAM
options = {
"host": oss.INFOBLOX.host,
"username": oss.INFOBLOX.username,
"password": oss.INFOBLOX.password,
"wapi_version": oss.INFOBLOX.wapi_version,
"ssl_verify": oss.INFOBLOX.scheme == "https",
}
return connector.Connector(options), oss
def _allocate_network(
conn: connector.Connector,
dns_view: str,
network_view: str,
netmask: int,
containers: list[str],
comment: str | None = "",
) -> ipaddress.IPv4Network | ipaddress.IPv6Network:
"""Allocate a new network in Infoblox.
The function will go over all given containers, and try to allocate a network within the available IP space. If no
space is available, this method raises an :class:`AllocationError`.
:param :class:`infoblox_client.connector.Connector` conn: An active Infoblox connection.
:param str dns_view: The Infoblox ``dns_view`` in which the network should be allocated.
:param str network_view: The Infoblox ``network_view`` where the network should be allocated.
:param int netmask: The netmask of the desired network. Can be up to 32 for v4 networks, and 128 for v6 networks.
:param list [str] containers: A list of network containers in which the network should be allocated, given in
:term:`CIDR` notation.
:param str comment: Optionally, a comment can be added to the network allocation.
"""
for container in [ipaddress.ip_network(con) for con in containers]:
for network in container.subnets(new_prefix=netmask):
if objects.Network.search(conn, network=str(network)) is None:
created_net = objects.Network.create(
conn, network=str(network), view=dns_view, network_view=network_view, comment=comment
)
if created_net.response != "Infoblox Object already Exists":
return ipaddress.ip_network(created_net.network)
msg = f"IP container {container} appears to be full."
logger.warning(msg)
msg = f"Cannot allocate anything in {containers}, check whether any IP space is available."
raise AllocationError(msg)
def hostname_available(hostname: str) -> bool:
"""Check whether a hostname is still available **in Infoblox**.
Check whether Infoblox already has a :class:`infoblox_client.objects.HostRecord` that matches the given hostname.
.. warning::
This method only checks within the Infoblox instance, and not the rest of the internet. The hostname could
therefore still be taken elsewhere.
:param hostname: The hostname to be checked.
:type hostname: str
"""
conn, _ = _setup_connection()
return objects.HostRecord.search(conn, name=hostname) is None
def allocate_v4_network(service_type: str, comment: str | None = "") -> ipaddress.IPv4Network:
"""Allocate a new IPv4 network in Infoblox.
Allocate an IPv4 network for a specific service type. The service type should be defined in the :term:`OSS`
parameters of :term:`GSO`, from which the containers and netmask will be used.
:param service_type: The service type for which the network is allocated.
:type service_type: str
:param comment: A comment to be added to the allocated network in Infoblox.
:type comment: str, optional
"""
conn, oss = _setup_connection()
netmask = getattr(oss, service_type).V4.mask
containers = getattr(oss, service_type).V4.containers
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
return ipaddress.IPv4Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment))
def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddress.IPv6Network:
"""Allocate a new IPv6 network in Infoblox.
Allocate an IPv6 network for a specific service type. The service type should be defined in the :term:`OSS`
parameters of :term:`GSO`, from which the containers and netmask will be used.
:param service_type: The service type for which the network is allocated.
:type service_type: str
:param comment: A comment to be added to the allocated network in Infoblox.
:type comment: str, optional
"""
conn, oss = _setup_connection()
netmask = getattr(oss, service_type).V6.mask
containers = getattr(oss, service_type).V6.containers
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
return ipaddress.IPv6Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment))
def find_network_by_cidr(
ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network,
) -> objects.Network | None:
"""Find a network in Infoblox by its :term:`CIDR`.
:param ip_network: The :term:`CIDR` that is searched.
:type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network
"""
conn, _ = _setup_connection()
return objects.Network.search(conn, cidr=str(ip_network))
def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None:
"""Delete a network in Infoblox.
Delete a network that is allocated in Infoblox, by passing the :term:`CIDR` to be deleted. The :term:`CIDR` must
exactly match an existing entry in Infoblox, otherwise this method raises a :class:`DeletionError`
:param ip_network: The network that should get deleted.
:type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network
"""
network = find_network_by_cidr(ip_network)
if network:
network.delete()
else:
msg = f"Could not find network {ip_network}, nothing has been deleted."
raise DeletionError(msg)
def allocate_host(
hostname: str, service_type: str, cname_aliases: list[str], comment: str
) -> tuple[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""Allocate a new host record in Infoblox.
Create a new host record in Infoblox, by providing a hostname, and the service type that is associated with this new
host. Most likely to be a loopback interface. If the hostname is not available in Infoblox (due to a potential
collision) this method raises an :class:`AllocationError`.
:param hostname: The :term:`FQDN` of the new host
:type hostname: str
:param service_type: The service type from which IP resources should be used.
:type service_type: str
:param cname_aliases: A list of any :term:`CNAME` aliases that should be associated with this host. Most often this
will be a single loopback address.
:type cname_aliases: list[str]
:param comment: A comment that is added to the host record in Infoblox, should be the ``subscription_id`` of the new
:class:`Router` subscription.
:type comment: str
"""
if not hostname_available(hostname):
msg = f"Cannot allocate new host, FQDN {hostname} already taken."
raise AllocationError(msg)
conn, oss = _setup_connection()
allocation_networks_v4 = getattr(oss, service_type).V4.networks
allocation_networks_v6 = getattr(oss, service_type).V6.networks
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
created_v6 = None
for ipv6_range in allocation_networks_v6:
v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv6_range))
ipv6_object = objects.IP.create(ip=v6_alloc, mac=NULL_MAC, configure_for_dhcp=False)
try:
new_host = objects.HostRecord.create(
conn,
ip=ipv6_object,
name=hostname,
aliases=cname_aliases,
comment=comment,
view=dns_view,
network_view=network_view,
)
created_v6 = ipaddress.IPv6Address(new_host.ipv6addr)
except InfobloxCannotCreateObject:
msg = f"Cannot find 1 available IP address in network {ipv6_range}."
logger.warning(msg)
if created_v6 is None:
msg = f"Cannot find 1 available IP address in networks {allocation_networks_v6}."
raise AllocationError(msg)
created_v4 = None
for ipv4_range in allocation_networks_v4:
v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv4_range))
ipv4_object = objects.IP.create(ip=v4_alloc, mac=NULL_MAC, configure_for_dhcp=False)
new_host = objects.HostRecord.search(conn, name=hostname)
new_host.ipv4addrs = [ipv4_object]
try:
new_host.update()
new_host = objects.HostRecord.search(conn, name=hostname)
created_v4 = ipaddress.IPv4Address(new_host.ipv4addr)
except InfobloxCannotUpdateObject:
msg = f"Cannot find 1 available IP address in network {ipv4_range}."
logger.warning(msg)
if created_v4 is None:
msg = f"Cannot find 1 available IP address in networks {allocation_networks_v4}."
raise AllocationError(msg)
return created_v4, created_v6
def create_host_by_ip(
hostname: str,
ipv4_address: ipaddress.IPv4Address,
ipv6_address: ipaddress.IPv6Address,
service_type: str,
comment: str,
) -> None:
"""Create a new host record with a given IPv4 and IPv6 address.
:param str hostname: The :term:`FQDN` of the new host.
:param IPv4Address ipv4_address: The IPv4 address of the new host.
:param IPv6Address ipv6_address: The IPv6 address of the new host.
:param str service_type: The relevant service type, used to deduce the correct ``dns_view`` and ``network_view`` in
Infoblox.
:param str comment: The comment stored in this Infoblox record, most likely the relevant ``subscription_id`` in
:term:`GSO`.
"""
if not hostname_available(hostname):
msg = f"Cannot allocate new host, FQDN {hostname} already taken."
raise AllocationError(msg)
conn, oss = _setup_connection()
ipv6_object = objects.IP.create(ip=str(ipv6_address), mac=NULL_MAC, configure_for_dhcp=False)
ipv4_object = objects.IP.create(ip=str(ipv4_address), mac=NULL_MAC, configure_for_dhcp=False)
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
# This needs to be done in two steps, otherwise only one of the IP addresses is stored.
objects.HostRecord.create(
conn, ip=ipv6_object, name=hostname, comment=comment, view=dns_view, network_view=network_view
)
new_host = find_host_by_fqdn(hostname)
new_host.ipv4addrs = [ipv4_object]
new_host.update()
def find_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> objects.HostRecord | None:
"""Find a host record in Infoblox by its associated IP address.
:param ip_addr: The IP address of a host that is searched for.
:type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address
"""
conn, _ = _setup_connection()
if ip_addr.version == 4: # noqa: PLR2004, the 4 in IPv4 is well-known and not a "magic value."
return objects.HostRecord.search(
conn,
ipv4addr=ip_addr,
return_fields=["ipv4addrs", "name", "view", "aliases", "comment"],
)
return objects.HostRecordV6.search(
conn,
ipv6addr=ip_addr,
return_fields=["ipv6addrs", "name", "view", "aliases", "comment"],
)
def find_host_by_fqdn(fqdn: str) -> objects.HostRecord:
"""Find a host record by its associated :term:`FQDN`.
:param fqdn: The :term:`FQDN` of a host that is searched for.
:type fqdn: str
"""
conn, _ = _setup_connection()
return objects.HostRecord.search(
conn,
name=fqdn,
return_fields=["ipv4addrs", "name", "view", "aliases", "comment"],
)
def find_v6_host_by_fqdn(fqdn: str) -> objects.HostRecordV6:
"""Find a host record by its associated :term:`FQDN`.
This specific method will return the IPv6 variant of a record, if it exists.
:param str fqdn: The :term:`FQDN` of a host that is searched for.
"""
conn, _ = _setup_connection()
return objects.HostRecordV6.search(
conn, name=fqdn, return_fields=["ipv6addrs", "name", "view", "aliases", "comment"]
)
def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None:
"""Delete a host from Infoblox.
Delete a host record in Infoblox, by providing the IP address that is associated with the record. Raises a
:class:`DeletionError` if no record can be found in Infoblox.
:param ip_addr: The IP address of the host record that should get deleted.
:type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address
"""
host = find_host_by_ip(ip_addr)
if host:
host.delete()
else:
msg = f"Could not find host at {ip_addr}, nothing has been deleted."
raise DeletionError(msg)
def delete_host_by_fqdn(fqdn: str) -> None:
"""Delete a host from Infoblox.
Delete a host record in Infoblox, by providing the :term:`FQDN` that is associated with the record. Raises a
:class:`DeletionError` if no record can be found in Infoblox.
:param fqdn: The :term:`FQDN` of the host record that should get deleted.
:type fqdn: str
"""
host = find_host_by_fqdn(fqdn)
if host:
host.delete()
else:
msg = f"Could not find host at {fqdn}, nothing has been deleted."
raise DeletionError(msg)