Skip to content
Snippets Groups Projects
Verified Commit fd0bb61c authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

add search methods for networks and hosts to the Infoblox service

parent 6bf66680
Branches
Tags
1 merge request!65Feature/add infoblox service
......@@ -121,6 +121,16 @@ def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddres
return ipaddress.IPv6Network(_allocate_network(conn, dns_view, netmask, containers, comment))
def find_network_by_cidr(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> objects.Network | None:
"""Find a network in Infoblox by its {term}`CIDR`.
:param ip_network: The {term}`CIDR` that is searched.
:type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network
"""
conn, _ = _setup_connection()
return objects.Network.search(conn, cidr=str(ip_network))
def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) -> None:
"""Delete a network in Infoblox.
......@@ -130,8 +140,7 @@ def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) ->
:param ip_network: The network that should get deleted.
:type ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network
"""
conn, _ = _setup_connection()
network = objects.Network.search(conn, cidr=str(ip_network))
network = find_network_by_cidr(ip_network)
if network:
network.delete()
else:
......@@ -139,7 +148,7 @@ def delete_network(ip_network: ipaddress.IPv4Network | ipaddress.IPv6Network) ->
def allocate_host(
hostname: str, service_type: str, cname_aliases: list[str], comment: str | None = ""
hostname: str, service_type: str, cname_aliases: list[str], comment: str
) -> tuple[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""Allocate a new host record in Infoblox.
......@@ -154,8 +163,9 @@ def allocate_host(
:param cname_aliases: A list of any {term}`CNAME` aliases that should be associated with this host. Most often this
will be a single loopback address.
:type cname_aliases: list[str]
:param comment: Optionally, a comment can be added to the host record in Infoblox.
:type comment: str, optional
:param comment: A comment that is added to the host record in Infoblox, should be the `subscription_id` of the new
{class}`Router` subscription.
:type comment: str
"""
if not hostname_available(hostname):
raise AllocationError(f"Cannot allocate new host, FQDN {hostname} already taken.")
......@@ -199,6 +209,32 @@ def allocate_host(
return created_v4, created_v6
def find_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> objects.HostRecord | None:
"""Find a host record in Infoblox by its associated IP address.
:param ip_addr: The IP address of a host that is searched for.
:type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address
"""
conn, _ = _setup_connection()
if ip_addr.version == 4:
return objects.HostRecord.search(
conn, ipv4addr=ip_addr, return_fields=["ipv4addrs", "name", "view", "aliases", "comment"]
)
return objects.HostRecord.search(
conn, ipv6addr=ip_addr, return_fields=["ipv6addrs", "name", "view", "aliases", "comment"]
)
def find_host_by_fqdn(fqdn: str) -> objects.HostRecord | None:
"""Find a host record by its associated {term}`FQDN`.
:param fqdn: The {term}`FQDN` of a host that is searched for.
:type fqdn: str
"""
conn, _ = _setup_connection()
return objects.HostRecord.search(conn, name=fqdn, return_fields=["ipv4addrs", "name", "view", "aliases", "comment"])
def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> None:
"""Delete a host from Infoblox.
......@@ -208,8 +244,7 @@ def delete_host_by_ip(ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address) ->
:param ip_addr: The IP address of the host record that should get deleted.
:type ip_addr: ipaddress.IPv4Address | ipaddress.IPv6Address
"""
conn, _ = _setup_connection()
host = objects.HostRecord.search(conn, ipv4addr=ip_addr)
host = find_host_by_ip(ip_addr)
if host:
host.delete()
else:
......@@ -225,6 +260,8 @@ def delete_host_by_fqdn(fqdn: str) -> None:
:param fqdn: The FQDN of the host record that should get deleted.
:type fqdn: str
"""
conn, _ = _setup_connection()
host = objects.HostRecord.search(conn, name=fqdn)
host.delete()
host = find_host_by_fqdn(fqdn)
if host:
host.delete()
else:
raise DeletionError(f"Could not find host at {fqdn}, nothing has been deleted.")
from typing import Optional
import infoblox_client.objects
# noinspection PyProtectedMember
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice
......@@ -8,15 +10,15 @@ from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUID
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import validator
from gso.products.product_blocks import router as router_pb
from gso.products.product_types import router
from gso.products.product_types.router import RouterInactive, RouterProvisioning
from gso.products.product_types.site import Site
from gso.products.shared import PortNumber
from gso.services import infoblox, provisioning_proxy, subscriptions
from gso.services.provisioning_proxy import pp_interaction
from gso.workflows.utils import customer_selector, iso_from_ipv4
from products.product_blocks.router import RouterRole, RouterVendor, generate_fqdn
def _site_selector() -> Choice:
......@@ -39,10 +41,19 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
router_site: _site_selector() # type: ignore
hostname: str
ts_port: PortNumber
router_vendor: router_pb.RouterVendor
router_role: router_pb.RouterRole
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: Optional[bool]
@validator("hostname", allow_reuse=True)
def hostname_must_be_available(cls, hostname: str, **kwargs) -> str:
selected_site = Site.from_subscription(kwargs["values"].get("router_site", "")).site
input_fqdn = generate_fqdn(hostname, selected_site.site_name, selected_site.site_country_code)
if not infoblox.hostname_available(input_fqdn):
raise ValueError(f'FQDN "{input_fqdn}" is not available.')
return hostname
user_input = yield CreateRouterForm
return user_input.dict()
......@@ -58,10 +69,35 @@ def create_subscription(product: UUIDstr, customer: UUIDstr) -> State:
}
@step("Initialize subscription")
def initialize_subscription(
subscription: RouterInactive,
hostname: str,
ts_port: PortNumber,
router_vendor: RouterVendor,
router_site: str,
router_role: RouterRole,
) -> State:
subscription.router.router_ts_port = ts_port
subscription.router.router_vendor = router_vendor
subscription.router.router_site = Site.from_subscription(router_site).site
fqdn = generate_fqdn(
hostname, subscription.router.router_site.site_name, subscription.router.router_site.site_country_code
)
subscription.router.router_fqdn = fqdn
subscription.router.router_role = router_role
subscription.router.router_access_via_ts = True
subscription.description = f"Router {fqdn}"
subscription = RouterProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@step("Allocate loopback interfaces in IPAM")
def ipam_allocate_loopback(subscription: RouterProvisioning, is_ias_connected: bool) -> State:
fqdn = subscription.router.router_fqdn
loopback_v4, loopback_v6 = infoblox.allocate_host(f"lo0.{fqdn}", "LO", [fqdn])
loopback_v4, loopback_v6 = infoblox.allocate_host(f"lo0.{fqdn}", "LO", [fqdn], str(subscription.subscription_id))
subscription.router.router_lo_ipv4_address = loopback_v4
subscription.router.router_lo_ipv6_address = loopback_v6
......@@ -75,36 +111,15 @@ def ipam_allocate_loopback(subscription: RouterProvisioning, is_ias_connected: b
def ipam_allocate_ias_networks(subscription: RouterProvisioning) -> State:
fqdn = subscription.router.router_fqdn
subscription.router.router_si_ipv4_network = infoblox.allocate_v4_network("SI", f"SI for {fqdn}")
subscription.router.router_ias_lt_ipv4_network = infoblox.allocate_v4_network("LT_IAS", f"LT for {fqdn}")
subscription.router.router_ias_lt_ipv6_network = infoblox.allocate_v6_network("LT_IAS", f"LT for {fqdn}")
return {"subscription": subscription}
@step("Initialize subscription")
def initialize_subscription(
subscription: router.RouterInactive,
hostname: str,
ts_port: PortNumber,
router_vendor: router_pb.RouterVendor,
router_site: str,
router_role: router_pb.RouterRole,
) -> State:
subscription.router.router_ts_port = ts_port
subscription.router.router_vendor = router_vendor
subscription.router.router_site = Site.from_subscription(router_site).site
fqdn = (
f"{hostname}.{subscription.router.router_site.site_name.lower()}."
f"{subscription.router.router_site.site_country_code.lower()}"
".geant.net"
subscription.router.router_si_ipv4_network = infoblox.allocate_v4_network(
"SI", f"SI for {fqdn} - {subscription.subscription_id}"
)
subscription.router.router_ias_lt_ipv4_network = infoblox.allocate_v4_network(
"LT_IAS", f"LT for {fqdn} - {subscription.subscription_id}"
)
subscription.router.router_ias_lt_ipv6_network = infoblox.allocate_v6_network(
"LT_IAS", f"LT for {fqdn} - {subscription.subscription_id}"
)
subscription.router.router_fqdn = fqdn
subscription.router.router_role = router_role
subscription.router.router_access_via_ts = True
subscription.description = f"Router {fqdn}"
subscription = router.RouterProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
......@@ -135,6 +150,44 @@ def provision_router_real(subscription: RouterProvisioning, process_id: UUIDstr)
}
@step("Verify IPAM resources for loopback interface")
def verify_ipam_loopback(subscription: RouterProvisioning) -> State:
host_record = infoblox.find_host_by_fqdn(f"lo0.{subscription.router.router_fqdn}")
if not host_record or str(subscription.subscription_id) not in host_record.comment:
return {
"ipam_warning": "!!! Loopback record is incorrectly configured in IPAM, please investigate this manually. !!!"
}
return {"subscription": subscription}
@step("Verify IPAM resources for IAS/LT networks")
def verify_ipam_ias(subscription: RouterProvisioning) -> State:
si_ipv4_network = infoblox.find_network_by_cidr(subscription.router.router_si_ipv4_network)
ias_lt_ipv4_network = infoblox.find_network_by_cidr(subscription.router.router_ias_lt_ipv4_network)
ias_lt_ipv6_network = infoblox.find_network_by_cidr(subscription.router.router_ias_lt_ipv6_network)
new_state = {}
if not si_ipv4_network or str(subscription.subscription_id) not in si_ipv4_network.comment:
new_state = {
"ipam_si_warning": f"SI IPv4 network expected at {subscription.router.router_si_ipv4_network}, "
f"but it was not found or misconfigured, please investigate and adjust if necessary."
}
if not ias_lt_ipv4_network or str(subscription.subscription_id) not in ias_lt_ipv4_network.comment:
new_state = new_state | {
"ipam_ias_lt_ipv4_warning": f"IAS/LT IPv4 network expected at {subscription.router.router_ias_lt_ipv4_network}, "
f"but it was not found or misconfigured, please investigate and adjust if necessary."
}
if not ias_lt_ipv6_network or str(subscription.subscription_id) not in ias_lt_ipv6_network.comment:
new_state = new_state | {
"ipam_ias_lt_ipv6_warning": f"IAS/LT IPv6 network expected at {subscription.router.router_ias_lt_ipv6_network}, "
f"but it was not found or misconfigured, please investigate and adjust if necessary."
}
return new_state
@workflow(
"Create router",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
......@@ -152,6 +205,8 @@ def create_router() -> StepList:
>> should_allocate_ias(ipam_allocate_ias_networks)
>> pp_interaction(provision_router_dry, 3)
>> pp_interaction(provision_router_real, 3)
>> verify_ipam_loopback
>> should_allocate_ias(verify_ipam_ias)
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment