Skip to content
Snippets Groups Projects
Verified Commit f6b358fe authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

add network and host allocation to Infoblox service

parent 38b73455
Branches
Tags
1 merge request!65Feature/add infoblox service
import ipaddress import ipaddress
from logging import getLogger
from infoblox_client import connector, objects from infoblox_client import connector, objects
from infoblox_client.exceptions import InfobloxCannotCreateObject, InfobloxCannotUpdateObject
from gso.settings import load_oss_params from gso.settings import IPAMParams, load_oss_params
logger = getLogger(__name__)
def _setup_connection() -> connector.Connector:
oss = load_oss_params() class AllocationError(Exception):
pass
class DeletionError(Exception):
pass
def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
oss = load_oss_params().IPAM
options = { options = {
"host": oss.IPAM.INFOBLOX.host, "host": oss.INFOBLOX.host,
"username": oss.IPAM.INFOBLOX.username, "username": oss.INFOBLOX.username,
"password": oss.IPAM.INFOBLOX.password, "password": oss.INFOBLOX.password,
"wapi_version": oss.IPAM.INFOBLOX.wapi_version[1:], # remove the 'v' in front of the version number "wapi_version": oss.INFOBLOX.wapi_version[1:], # remove the 'v' in front of the version number
"ssl_verify": True if oss.IPAM.INFOBLOX.scheme == "https" else False, "ssl_verify": True if oss.INFOBLOX.scheme == "https" else False,
} }
return connector.Connector(options) return connector.Connector(options), oss
def allocate_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> objects.Network: def _allocate_network(
conn = _setup_connection() conn: connector.Connector,
return objects.Network.create(conn, str(ip_network)) dns_view: str,
netmask: int,
containers: list[str],
comment: str,
) -> ipaddress.IPv4Network | ipaddress.IPv6Network:
for container in [ipaddress.ip_network(con) for con in containers]:
for network in container.subnets(new_prefix=netmask):
if objects.Network.search(conn, network=str(network)) is None:
created_net = objects.Network.create(conn, network=str(network), dns_view=dns_view, comment=comment)
if created_net.response != "Infoblox Object already Exists":
return ipaddress.ip_network(created_net.network)
logger.warning(f"IP container {container} appears to be full.")
raise AllocationError(f"Cannot allocate anything in {containers}, check whether any IP space is available.")
def hostname_available(hostname: str) -> bool:
conn, _ = _setup_connection()
return objects.HostRecord.search(conn, name=hostname) is None
def allocate_v4_network(service_type: str, comment: str | None = "") -> ipaddress.IPv4Network:
conn, oss = _setup_connection()
netmask = getattr(oss, service_type).V4.mask
containers = getattr(oss, service_type).V4.containers
dns_view = getattr(oss, service_type).dns_view
return _allocate_network(conn, dns_view, netmask, containers, comment)
def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddress.IPv6Network:
conn, oss = _setup_connection()
netmask = getattr(oss, service_type).V6.mask
containers = getattr(oss, service_type).V6.containers
dns_view = getattr(oss, service_type).dns_view
return _allocate_network(conn, dns_view, netmask, containers, comment)
def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None: def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None:
conn = _setup_connection() conn = _setup_connection()
network = objects.Network.search(conn, cidr=str(ip_network)) network = objects.Network.search(conn, cidr=str(ip_network))
network.delete() if network:
network.delete()
else:
raise DeletionError(f"Could not find network {ip_network}, nothing has been deleted.")
def allocate_host(hostname: str, net_cidr: ipaddress.IPv4Network | ipaddress.IPv6Network) -> objects.HostRecord: def allocate_host(
conn = _setup_connection() hostname: str, service_type: str, cname_aliases: list[str], comment: str | None = ""
address = objects.IPAllocation.next_available_ip_from_cidr("default", net_cidr) ) -> tuple[ipaddress.IPv4Address, ipaddress.IPv6Address]:
ip_object = objects.IP.create(address, "00:00:00:00:00:00", configure_for_dhcp=False) if not hostname_available(hostname):
return objects.HostRecord.create(conn, name=hostname, ip=ip_object) raise AllocationError(f"Cannot allocate new host, FQDN {hostname} already taken.")
conn, oss = _setup_connection()
allocation_networks_v4 = getattr(oss, service_type).V4.networks
allocation_networks_v6 = getattr(oss, service_type).V6.networks
dns_view = getattr(oss, service_type).dns_view
created_v6 = None
for ipv6_range in allocation_networks_v6:
v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(dns_view, ipv6_range)
ipv6_object = objects.IP.create(ip=v6_alloc, mac="00:00:00:00:00:00", configure_for_dhcp=False)
try:
new_host = objects.HostRecord.create(
conn, ip=ipv6_object, name=hostname, aliases=cname_aliases, comment=comment, dns_view=dns_view
)
created_v6 = ipaddress.IPv6Address(new_host.ipv6addr)
except InfobloxCannotCreateObject:
logger.warning(f"Cannot find 1 available IP address in network {ipv6_range}.")
if created_v6 is None:
raise AllocationError(f"Cannot find 1 available IP address in networks {allocation_networks_v6}.")
created_v4 = None
for ipv4_range in allocation_networks_v4:
v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(dns_view, ipv4_range)
ipv4_object = objects.IP.create(ip=v4_alloc, mac="00:00:00:00:00:00", configure_for_dhcp=False)
new_host = objects.HostRecord.search(conn, name=hostname)
new_host.ipv4addrs = [ipv4_object]
try:
new_host.update()
created_v4 = ipaddress.IPv4Address(new_host.ipv4addr)
except InfobloxCannotUpdateObject:
logger.warning(f"Cannot find 1 available IP address in network {ipv4_range}.")
if created_v4 is None:
raise AllocationError(f"Cannot find 1 available IP address in networks {allocation_networks_v4}.")
return created_v4, created_v6
def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None: def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None:
conn = _setup_connection() conn = _setup_connection()
host = objects.HostRecord.search(conn, ipv4addr=ip_addr) host = objects.HostRecord.search(conn, ipv4addr=ip_addr)
host.delete() if host:
host.delete()
else:
raise DeletionError(f"Could not find host at {ip_addr}, nothing has been deleted.")
def delete_host_by_fqdn(fqdn: str) -> None: def delete_host_by_fqdn(fqdn: str) -> None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment