-
Karel van Klink authored
These are not needed anymore, as MkDocs will pick up on them automatically
Karel van Klink authoredThese are not needed anymore, as MkDocs will pick up on them automatically
infoblox.py 14.07 KiB
"""The Infoblox service that allocates IPAM resources used in GSO products."""
import ipaddress
from logging import getLogger
from infoblox_client import connector, objects
from infoblox_client.exceptions import (
InfobloxCannotCreateObject,
InfobloxCannotUpdateObject,
)
from gso.settings import IPAMParams, load_oss_params
from gso.utils.types.ip_address import IPv4AddressType, IPv6AddressType
logger = getLogger(__name__)
NULL_MAC = "00:00:00:00:00:00"
class AllocationError(Exception):
"""Raised when Infoblox failed to allocate a resource."""
def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
"""Set up a new connection with an Infoblox instance.
:return: A tuple that has an Infoblox ``Connector`` instance, and IPAM parameters.
:rtype: tuple[:class:`Connector`, IPAMParams]
"""
oss = load_oss_params().IPAM
options = {
"host": oss.INFOBLOX.host,
"username": oss.INFOBLOX.username,
"password": oss.INFOBLOX.password,
"wapi_version": oss.INFOBLOX.wapi_version,
"ssl_verify": oss.INFOBLOX.scheme == "https",
}
return connector.Connector(options), oss
def _allocate_network(
conn: connector.Connector,
dns_view: str,
network_view: str,
netmask: int,
containers: list[str],
comment: str | None = "",
) -> ipaddress.IPv4Network | ipaddress.IPv6Network:
"""Allocate a new network in Infoblox.
The function will go over all given containers, and try to allocate a network within the available IP space. If no
space is available, this method raises an :class:`AllocationError`.
:param :class:`infoblox_client.connector.Connector` conn: An active Infoblox connection.
:param str dns_view: The Infoblox ``dns_view`` in which the network should be allocated.
:param str network_view: The Infoblox ``network_view`` where the network should be allocated.
:param int netmask: The netmask of the desired network. Can be up to 32 for v4 networks, and 128 for v6 networks.
:param list [str] containers: A list of network containers in which the network should be allocated, given in
CIDR notation.
:param str comment: Optionally, a comment can be added to the network allocation.
"""
for container in [ipaddress.ip_network(con) for con in containers]:
for network in container.subnets(new_prefix=netmask):
if objects.Network.search(conn, network=str(network)) is None:
created_net = objects.Network.create(
conn, network=str(network), view=dns_view, network_view=network_view, comment=comment
)
if created_net.response != "Infoblox Object already Exists":
return ipaddress.ip_network(created_net.network)
msg = f"IP container {container} appears to be full."
logger.warning(msg)
msg = f"Cannot allocate anything in {containers}, check whether any IP space is available."
raise AllocationError(msg)
def hostname_available(hostname: str) -> bool:
"""Check whether a hostname is still available **in Infoblox**.
Check whether Infoblox already has a :class:`infoblox_client.objects.HostRecord` that matches the given hostname.
.. warning::
This method only checks within the Infoblox instance, and not the rest of the internet. The hostname could
therefore still be taken elsewhere.
:param hostname: The hostname to be checked.
:type hostname: str
"""
conn, _ = _setup_connection()
return objects.HostRecord.search(conn, name=hostname) is None
def allocate_v4_network(service_type: str, comment: str | None = "") -> ipaddress.IPv4Network:
"""Allocate a new IPv4 network in Infoblox.
Allocate an IPv4 network for a specific service type. The service type should be defined in the OSS
parameters of :term:`GSO`, from which the containers and netmask will be used.
:param service_type: The service type for which the network is allocated.
:type service_type: str
:param comment: A comment to be added to the allocated network in Infoblox.
:type comment: str, optional
"""
conn, oss = _setup_connection()
netmask = getattr(oss, service_type).V4.mask
containers = getattr(oss, service_type).V4.containers
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
return ipaddress.IPv4Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment))
def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddress.IPv6Network:
"""Allocate a new IPv6 network in Infoblox.
Allocate an IPv6 network for a specific service type. The service type should be defined in the :term:`OSS`
parameters of :term:`GSO`, from which the containers and netmask will be used.
:param service_type: The service type for which the network is allocated.
:type service_type: str
:param comment: A comment to be added to the allocated network in Infoblox.
:type comment: str, optional
"""
conn, oss = _setup_connection()
netmask = getattr(oss, service_type).V6.mask
containers = getattr(oss, service_type).V6.containers
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
return ipaddress.IPv6Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment))
def find_network_by_cidr(
ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network,
) -> objects.Network | None:
"""Find a network in Infoblox by its :term:`CIDR`.
:param ip_network: The :term:`CIDR` that is searched.
:type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network
"""
conn, _ = _setup_connection()
return objects.Network.search(conn, cidr=str(ip_network))
def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None:
"""Delete a network in Infoblox.
Delete a network that is allocated in Infoblox, by passing the :term:`CIDR` to be deleted. The :term:`CIDR` must
exactly match an existing entry in Infoblox, otherwise this method raises a :class:`DeletionError`
:param ip_network: The network that should get deleted.
:type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network
"""
network = find_network_by_cidr(ip_network)
if network:
network.delete()
else:
msg = f"Could not find network {ip_network}, nothing has been deleted."
logger.warning(msg)
def allocate_host(
hostname: str, service_type: str, cname_aliases: list[str], comment: str
) -> tuple[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""Allocate a new host record in Infoblox.
Create a new host record in Infoblox, by providing a hostname, and the service type that is associated with this new
host. Most likely to be a loopback interface. If the hostname is not available in Infoblox (due to a potential
collision) this method raises an :class:`AllocationError`.
:param hostname: The :term:`FQDN` of the new host
:type hostname: str
:param service_type: The service type from which IP resources should be used.
:type service_type: str
:param cname_aliases: A list of any :term:`CNAME` aliases that should be associated with this host. Most often this
will be a single loopback address.
:type cname_aliases: list[str]
:param comment: A comment that is added to the host record in Infoblox, should be the ``subscription_id`` of the new
:class:`Router` subscription.
:type comment: str
"""
if not hostname_available(hostname):
msg = f"Cannot allocate new host, FQDN {hostname} already taken."
raise AllocationError(msg)
conn, oss = _setup_connection()
allocation_networks_v4 = getattr(oss, service_type).V4.networks
allocation_networks_v6 = getattr(oss, service_type).V6.networks
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
created_v6 = None
for ipv6_range in allocation_networks_v6:
v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv6_range))
ipv6_object = objects.IP.create(ip=v6_alloc, mac=NULL_MAC, configure_for_dhcp=False)
try:
new_host = objects.HostRecord.create(
conn,
ip=ipv6_object,
name=hostname,
aliases=cname_aliases,
comment=comment,
view=dns_view,
network_view=network_view,
)
created_v6 = ipaddress.IPv6Address(new_host.ipv6addr)
except InfobloxCannotCreateObject as e:
msg = f"Cannot find 1 available IP address in network {ipv6_range}."
logger.warning(msg, exc_info=e)
if created_v6 is None:
msg = f"Cannot find 1 available IP address in networks {allocation_networks_v6}."
raise AllocationError(msg)
created_v4 = None
for ipv4_range in allocation_networks_v4:
v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv4_range))
ipv4_object = objects.IP.create(ip=v4_alloc, mac=NULL_MAC, configure_for_dhcp=False)
new_host = objects.HostRecord.search(conn, name=hostname)
new_host.ipv4addrs = [ipv4_object]
try:
new_host.update()
new_host = objects.HostRecord.search(conn, name=hostname)
created_v4 = ipaddress.IPv4Address(new_host.ipv4addr)
except InfobloxCannotUpdateObject as e:
msg = f"Cannot find 1 available IP address in network {ipv4_range}."
logger.warning(msg, exc_info=e)
if created_v4 is None:
msg = f"Cannot find 1 available IP address in networks {allocation_networks_v4}."
raise AllocationError(msg)
return created_v4, created_v6
def create_host_by_ip(
hostname: str,
ipv4_address: IPv4AddressType,
ipv6_address: IPv6AddressType,
service_type: str,
comment: str,
) -> None:
"""Create a new host record with a given IPv4 and IPv6 address.
:param str hostname: The :term:`FQDN` of the new host.
:param IPv4Address ipv4_address: The IPv4 address of the new host.
:param IPv6Address ipv6_address: The IPv6 address of the new host.
:param str service_type: The relevant service type, used to deduce the correct ``dns_view`` and ``network_view`` in
Infoblox.
:param str comment: The comment stored in this Infoblox record, most likely the relevant ``subscription_id`` in
:term:`GSO`.
"""
if not hostname_available(hostname):
msg = f"FQDN '{hostname}' is already in use, allocation aborted."
raise AllocationError(msg)
conn, oss = _setup_connection()
ipv6_object = objects.IP.create(ip=str(ipv6_address), mac=NULL_MAC, configure_for_dhcp=False)
ipv4_object = objects.IP.create(ip=str(ipv4_address), mac=NULL_MAC, configure_for_dhcp=False)
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
# This needs to be done in two steps, otherwise only one of the IP addresses is stored.
objects.HostRecord.create(
conn, ip=ipv6_object, name=hostname, comment=comment, view=dns_view, network_view=network_view
)
new_host = find_host_by_fqdn(hostname)
new_host.ipv4addrs = [ipv4_object]
new_host.update()
def find_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> objects.HostRecord | None:
"""Find a host record in Infoblox by its associated IP address.
:param ip_addr: The IP address of a host that is searched for.
:type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address
"""
conn, _ = _setup_connection()
if ip_addr.version == 4: # noqa: PLR2004, the 4 in IPv4 is well-known and not a "magic value."
return objects.HostRecord.search(
conn,
ipv4addr=ip_addr,
return_fields=["ipv4addrs", "name", "view", "aliases", "comment"],
)
return objects.HostRecordV6.search(
conn,
ipv6addr=ip_addr,
return_fields=["ipv6addrs", "name", "view", "aliases", "comment"],
)
def find_host_by_fqdn(fqdn: str) -> objects.HostRecord:
"""Find a host record by its associated :term:`FQDN`.
:param fqdn: The :term:`FQDN` of a host that is searched for.
:type fqdn: str
"""
conn, _ = _setup_connection()
return objects.HostRecord.search(
conn,
name=fqdn,
return_fields=["ipv4addrs", "name", "view", "aliases", "comment"],
)
def find_v6_host_by_fqdn(fqdn: str) -> objects.HostRecordV6:
"""Find a host record by its associated :term:`FQDN`.
This specific method will return the IPv6 variant of a record, if it exists.
:param str fqdn: The :term:`FQDN` of a host that is searched for.
"""
conn, _ = _setup_connection()
return objects.HostRecordV6.search(
conn, name=fqdn, return_fields=["ipv6addrs", "name", "view", "aliases", "comment"]
)
def delete_host_by_ip(ip_addr: IPv4AddressType | ipaddress.IPv6Address) -> None:
"""Delete a host from Infoblox.
Delete a host record in Infoblox, by providing the IP address that is associated with the record. Raises a
:class:`DeletionError` if no record can be found in Infoblox.
:param ip_addr: The IP address of the host record that should get deleted.
:type ip_addr: IPv4AddressType | ipaddress.IPv6Address
"""
host = find_host_by_ip(ip_addr)
if host:
host.delete()
else:
msg = f"Could not find host at {ip_addr}, nothing has been deleted."
logger.warning(msg)
def delete_host_by_fqdn(fqdn: str) -> None:
"""Delete a host from Infoblox.
Delete a host record in Infoblox, by providing the :term:`FQDN` that is associated with the record. Raises a
:class:`DeletionError` if no record can be found in Infoblox.
:param fqdn: The :term:`FQDN` of the host record that should get deleted.
:type fqdn: str
"""
host = find_host_by_fqdn(fqdn)
if host:
host.delete()
else:
msg = f"Could not find host at {fqdn}, nothing has been deleted."
logger.warning(msg)