Skip to content
Snippets Groups Projects
Commit 613a4862 authored by geant-release-service's avatar geant-release-service
Browse files

Finished release 0.7.

parents d2cc2bb2 a081b2aa
Branches
Tags 0.7
No related merge requests found
Pipeline #85781 passed
Showing
with 325 additions and 117 deletions
......@@ -9,7 +9,9 @@ repos:
- --fix
- --preview
- --ignore=PLR0917,PLR0914
- --extend-exclude=test/*
# Run the formatter.
- id: ruff-format
args:
- --preview
- --exclude=test/*
# Changelog
All notable changes to this project will be documented in this file.
## [0.7] - 2024-02-21
- Infoblox client: added support for the `network_view` (IPAM).
## [0.6] - 2024-02-07
- Removed the import workflows from migrations.
......
......@@ -58,6 +58,9 @@ Glossary of terms
Simple Network Management Protocol: a protocol that's used for gathering data, widely used for network management
and monitoring.
TWAMP
Two-way Active Measurement Protocol.
UUID
Universally Unique Identifier
......
......@@ -14,4 +14,6 @@ Dark_fiber
PHASE 1
[Mm]odify
AAI
[M|m]iddleware
\ No newline at end of file
[M|m]iddleware
TWAMP
Pydantic
......@@ -82,13 +82,8 @@ class OIDCUserModel(dict):
if the attribute is one of the registered claims or raises an AttributeError
if the key is not found.
Args:
----
key: The attribute name to retrieve.
Returns:
-------
The value of the attribute if it exists, otherwise raises AttributeError.
:param str key: The attribute name to retrieve.
:return: The value of the attribute if it exists, otherwise raises AttributeError.
"""
try:
return object.__getattribute__(self, key)
......@@ -167,6 +162,7 @@ class OPAResult(BaseModel):
----------
- result (bool): Indicates whether the access request is allowed or denied.
- decision_id (str): A unique identifier for the decision made by OPA.
"""
result: bool = False
......@@ -208,15 +204,10 @@ class OIDCUser(HTTPBearer):
This is used as a security module in Fastapi projects
Args:
----
request: Starlette request method.
token: Optional value to directly pass a token.
Returns:
-------
OIDCUserModel object.
:param Request request: Starlette request method.
:param str token: Optional value to directly pass a token.
:return: OIDCUserModel object.
"""
if not oauth2lib_settings.OAUTH2_ACTIVE:
return None
......@@ -380,16 +371,12 @@ def opa_decision(
to authorize requests based on OPA policies. It utilizes OIDC for user information and makes a
call to the OPA service to determine authorization.
Args:
----
opa_url: URL of the Open Policy Agent service.
oidc_security: An instance of OIDCUser for user authentication.
auto_error: If True, automatically raises an HTTPException on authorization failure.
opa_kwargs: Additional keyword arguments to be passed to the OPA input.
:param str opa_url: URL of the Open Policy Agent service.
:param OIDCUser oidc_security: An instance of OIDCUser for user authentication.
:param bool auto_error: If True, automatically raises an HTTPException on authorization failure.
:param Mapping[str, str] | None opa_kwargs: Additional keyword arguments to be passed to the OPA input.
Returns:
-------
An asynchronous decision function that can be used as a dependency in FastAPI endpoints.
:return: An asynchronous decision function that can be used as a dependency in FastAPI endpoints.
"""
async def _opa_decision(
......@@ -407,6 +394,7 @@ def opa_decision(
request: Request object that will be used to retrieve request metadata.
user_info: The OIDCUserModel object that will be checked
async_request: The :term:`httpx` client.
"""
if not (oauth2lib_settings.OAUTH2_ACTIVE and oauth2lib_settings.OAUTH2_AUTHORIZATION_ACTIVE):
return None
......
......@@ -34,8 +34,6 @@ def get_oidc_user() -> OIDCUser:
This function returns the instance of OIDCUser initialized in the module.
It is typically used for accessing the OIDCUser across different parts of the application.
Returns
-------
OIDCUser: The instance of OIDCUser configured with OAuth2 settings.
:return OIDCUser: The instance of OIDCUser configured with OAuth2 settings.
"""
return oidc_user
......@@ -18,31 +18,36 @@
"V4": {"containers": [], "networks": ["1.1.0.0/24"], "mask": 0},
"V6": {"containers": [], "networks": ["dead:beef::/64"], "mask": 0},
"domain_name": ".lo",
"dns_view": "default"
"dns_view": "default",
"network_view": "default"
},
"TRUNK": {
"V4": {"containers": ["1.1.1.0/24"], "networks": [], "mask": 31},
"V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126},
"domain_name": ".trunk",
"dns_view": "default"
"dns_view": "default",
"network_view": "default"
},
"GEANT_IP": {
"V4": {"containers": ["1.1.2.0/24"], "networks": [], "mask": 31},
"V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126},
"domain_name": ".geantip",
"dns_view": "default"
"dns_view": "default",
"network_view": "default"
},
"SI": {
"V4": {"containers": ["1.1.3.0/24"], "networks": [], "mask": 31},
"V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126},
"domain_name": ".si",
"dns_view": "default"
"dns_view": "default",
"network_view": "default"
},
"LT_IAS": {
"V4": {"containers": ["1.1.4.0/24"], "networks": [], "mask": 31},
"V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126},
"domain_name": ".ltias",
"dns_view": "default"
"dns_view": "default",
"network_view": "default"
}
},
"MONITORING": {
......@@ -77,5 +82,13 @@
"THIRD_PARTY_API_KEYS": {
"AnsibleDynamicInventoryGenerator": "REALLY_random_AND_secure_T0keN",
"Application_2": "another_REALY_random_AND_3cure_T0keN"
},
"EMAIL": {
"from_address": "noreply@nren.local",
"smtp_host": "smtp.nren.local",
"smtp_port": 487,
"starttls_enabled": true,
"smtp_username": "username",
"smtp_password": "password"
}
}
......@@ -68,12 +68,12 @@ class RouterBlockProvisioning(RouterBlockInactive, lifecycle=[SubscriptionLifecy
router_fqdn: str
router_ts_port: PortNumber
router_access_via_ts: bool | None = None
router_lo_ipv4_address: ipaddress.IPv4Address | None = None
router_lo_ipv6_address: ipaddress.IPv6Address | None = None
router_lo_iso_address: str | None = None
router_role: RouterRole | None = None
router_site: SiteBlockProvisioning | None
router_access_via_ts: bool
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router_role: RouterRole
router_site: SiteBlockProvisioning
vendor: RouterVendor
......
......@@ -82,16 +82,16 @@ class SiteBlockInactive(
class SiteBlockProvisioning(SiteBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
"""A site that's currently being provisioned, see :class:`SiteBlock`."""
site_name: str | None = None
site_city: str | None = None
site_country: str | None = None
site_country_code: str | None = None
site_latitude: LatitudeCoordinate | None = None
site_longitude: LongitudeCoordinate | None = None
site_internal_id: int | None = None
site_bgp_community_id: int | None = None
site_tier: SiteTier | None = None
site_ts_address: str | None = None
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: LatitudeCoordinate
site_longitude: LongitudeCoordinate
site_internal_id: int
site_bgp_community_id: int
site_tier: SiteTier
site_ts_address: str
class SiteBlock(SiteBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
......@@ -120,4 +120,4 @@ class SiteBlock(SiteBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE])
#: The address of the terminal server that this router is connected to. The terminal server provides out of band
#: access. This is required in case a link goes down, or when a router is initially added to the network and it
#: does not have any IP trunks connected to it.
site_ts_address: str | None = None
site_ts_address: str
......@@ -41,29 +41,32 @@ def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
def _allocate_network(
conn: connector.Connector, dns_view: str, netmask: int, containers: list[str], comment: str | None = ""
conn: connector.Connector,
dns_view: str,
network_view: str,
netmask: int,
containers: list[str],
comment: str | None = "",
) -> ipaddress.IPv4Network | ipaddress.IPv6Network:
"""Allocate a new network in Infoblox.
The function will go over all given containers, and try to allocate a network within the available IP space. If no
space is available, this method raises an :class:`AllocationError`.
:param conn: An active Infoblox connection.
:type conn: :class:`infoblox_client.connector.Connector`
:param dns_view: The Infoblox ``dns_view`` in which the network should be allocated.
:type dns_view: str
:param netmask: The netmask of the desired network. Can be up to 32 for v4 networks, and 128 for v6 networks.
:type netmask: int
:param containers: A list of network containers in which the network should be allocated, given in :term:`CIDR`
notation.
:type containers: list[str]
:param comment: Optionally, a comment can be added to the network allocation.
:type comment: str, optional
:param :class:`infoblox_client.connector.Connector` conn: An active Infoblox connection.
:param str dns_view: The Infoblox ``dns_view`` in which the network should be allocated.
:param str network_view: The Infoblox ``network_view`` where the network should be allocated.
:param int netmask: The netmask of the desired network. Can be up to 32 for v4 networks, and 128 for v6 networks.
:param list [str] containers: A list of network containers in which the network should be allocated, given in
:term:`CIDR` notation.
:param str comment: Optionally, a comment can be added to the network allocation.
"""
for container in [ipaddress.ip_network(con) for con in containers]:
for network in container.subnets(new_prefix=netmask):
if objects.Network.search(conn, network=str(network)) is None:
created_net = objects.Network.create(conn, network=str(network), dns_view=dns_view, comment=comment)
created_net = objects.Network.create(
conn, network=str(network), dns_view=dns_view, network_view=network_view, comment=comment
)
if created_net.response != "Infoblox Object already Exists":
return ipaddress.ip_network(created_net.network)
msg = f"IP container {container} appears to be full."
......@@ -104,8 +107,9 @@ def allocate_v4_network(service_type: str, comment: str | None = "") -> ipaddres
netmask = getattr(oss, service_type).V4.mask
containers = getattr(oss, service_type).V4.containers
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
return ipaddress.IPv4Network(_allocate_network(conn, dns_view, netmask, containers, comment))
return ipaddress.IPv4Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment))
def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddress.IPv6Network:
......@@ -123,8 +127,9 @@ def allocate_v6_network(service_type: str, comment: str | None = "") -> ipaddres
netmask = getattr(oss, service_type).V6.mask
containers = getattr(oss, service_type).V6.containers
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
return ipaddress.IPv6Network(_allocate_network(conn, dns_view, netmask, containers, comment))
return ipaddress.IPv6Network(_allocate_network(conn, dns_view, network_view, netmask, containers, comment))
def find_network_by_cidr(
......@@ -184,10 +189,11 @@ def allocate_host(
allocation_networks_v4 = getattr(oss, service_type).V4.networks
allocation_networks_v6 = getattr(oss, service_type).V6.networks
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
created_v6 = None
for ipv6_range in allocation_networks_v6:
v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(dns_view, str(ipv6_range))
v6_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv6_range))
ipv6_object = objects.IP.create(ip=v6_alloc, mac=NULL_MAC, configure_for_dhcp=False)
try:
new_host = objects.HostRecord.create(
......@@ -197,6 +203,7 @@ def allocate_host(
aliases=cname_aliases,
comment=comment,
dns_view=dns_view,
network_view=network_view,
)
created_v6 = ipaddress.IPv6Address(new_host.ipv6addr)
except InfobloxCannotCreateObject:
......@@ -209,7 +216,7 @@ def allocate_host(
created_v4 = None
for ipv4_range in allocation_networks_v4:
v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(dns_view, str(ipv4_range))
v4_alloc = objects.IPAllocation.next_available_ip_from_cidr(network_view, str(ipv4_range))
ipv4_object = objects.IP.create(ip=v4_alloc, mac=NULL_MAC, configure_for_dhcp=False)
new_host = objects.HostRecord.search(conn, name=hostname)
new_host.ipv4addrs = [ipv4_object]
......@@ -240,7 +247,8 @@ def create_host_by_ip(
:param str hostname: The :term:`FQDN` of the new host.
:param IPv4Address ipv4_address: The IPv4 address of the new host.
:param IPv6Address ipv6_address: The IPv6 address of the new host.
:param str service_type: The relevant service type, used to deduce the correct ``dns_view`` in Infoblox.
:param str service_type: The relevant service type, used to deduce the correct ``dns_view`` and ``network_view`` in
Infoblox.
:param str comment: The comment stored in this Infoblox record, most likely the relevant ``subscription_id`` in
:term:`GSO`.
"""
......@@ -252,9 +260,12 @@ def create_host_by_ip(
ipv6_object = objects.IP.create(ip=ipv6_address, mac=NULL_MAC, configure_for_dhcp=False)
ipv4_object = objects.IP.create(ip=ipv4_address, mac=NULL_MAC, configure_for_dhcp=False)
dns_view = getattr(oss, service_type).dns_view
network_view = getattr(oss, service_type).network_view
# This needs to be done in two steps, otherwise only one of the IP addresses is stored.
objects.HostRecord.create(conn, ip=ipv6_object, name=hostname, comment=comment, dns_view=dns_view)
objects.HostRecord.create(
conn, ip=ipv6_object, name=hostname, comment=comment, dns_view=dns_view, network_view=network_view
)
new_host = find_host_by_fqdn(hostname)
new_host.ipv4addrs = [ipv4_object]
new_host.update()
......
"""The mailer service sends notification emails, as part of workflows that require interaction with external parties."""
import smtplib
from email.message import EmailMessage
from ssl import create_default_context
from gso.settings import load_oss_params
def send_mail(recipient: str, subject: str, body: str) -> None:
"""Send an email message to the given address.
Only supports STARTTLS, not SSL.
:param str recipient: The destination address.
:param str subject: The email subject.
:param str body: The contents of the email message.
"""
email_params = load_oss_params().EMAIL
msg = EmailMessage()
msg["From"] = email_params.from_address
msg["To"] = recipient
msg["Subject"] = subject
msg.set_content(body)
with smtplib.SMTP(email_params.smtp_host, email_params.smtp_port) as s:
if email_params.starttls_enabled:
tls_context = create_default_context()
s.starttls(context=tls_context)
if email_params.smtp_username and email_params.smtp_password:
s.login(email_params.smtp_username, email_params.smtp_password)
s.send_message(msg)
......@@ -81,6 +81,7 @@ class ServiceNetworkParams(BaseSettings):
V6: V6NetworkParams
domain_name: str
dns_view: str
network_view: str
class IPAMParams(BaseSettings):
......@@ -152,6 +153,18 @@ class NetBoxParams(BaseSettings):
api: str
class EmailParams(BaseSettings):
"""Parameters for the email service."""
# TODO: Use more strict types after we've migrated to Pydantic 2.x
from_address: str
smtp_host: str
smtp_port: int
starttls_enabled: bool
smtp_username: str | None
smtp_password: str | None
class OSSParams(BaseSettings):
"""The set of parameters required for running :term:`GSO`."""
......@@ -162,6 +175,7 @@ class OSSParams(BaseSettings):
PROVISIONING_PROXY: ProvisioningProxyParams
CELERY: CeleryParams
THIRD_PARTY_API_KEYS: dict[str, str]
EMAIL: EmailParams
def load_oss_params() -> OSSParams:
......
......@@ -77,8 +77,7 @@ def available_interfaces_choices_including_current_members(
],
)
options = {
interface["name"]: f"{interface['name']} - {interface['module']['display']} - {interface['description']}"
for interface in available_interfaces
interface["name"]: f"{interface['name']} {interface['description']}" for interface in available_interfaces
}
return Choice("ae member", zip(options.keys(), options.items(), strict=True)) # type: ignore[arg-type]
......@@ -271,3 +270,23 @@ def validate_interface_name_list(interface_name_list: list, vendor: str) -> list
)
raise ValueError(error_msg)
return interface_name_list
def validate_tt_number(tt_number: str) -> str:
"""Validate a string to match a specific pattern.
This method checks if the input string starts with 'TT#' and is followed by exactly 16 digits.
:param str tt_number: The TT number as string to validate
:return str: The TT number string if TT number match was successful, otherwise it will raise a ValueError.
"""
pattern = r"^TT#\d{16}$"
if not bool(re.match(pattern, tt_number)):
err_msg = (
f"The given TT number: {tt_number} is not valid. "
f" A valid TT number starts with 'TT#' followed by 16 digits."
)
raise ValueError(err_msg)
return tt_number
......@@ -17,12 +17,12 @@ from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.iptrunk import (
IptrunkInterfaceBlockInactive,
IptrunkSideBlockProvisioning,
IptrunkSideBlockInactive,
IptrunkType,
PhyPortCapacity,
)
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.products.product_types.iptrunk import IptrunkInactive
from gso.products.product_types.router import Router
from gso.services import infoblox, subscriptions
from gso.services.crm import get_customer_by_name
......@@ -36,6 +36,7 @@ from gso.utils.helpers import (
validate_interface_name_list,
validate_iptrunk_unique_interface,
validate_router_in_netbox,
validate_tt_number,
)
......@@ -57,6 +58,10 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
iptrunk_speed: PhyPortCapacity
iptrunk_minimum_links: int
@validator("tt_number", allow_reuse=True)
def validate_tt_number(cls, tt_number: str) -> str:
return validate_tt_number(tt_number)
initial_user_input = yield CreateIptrunkForm
router_enum_a = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
......@@ -101,13 +106,11 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
side_a_ae_members: ae_members_side_a # type: ignore[valid-type]
@validator("side_a_ae_members", allow_reuse=True)
def validate_iptrunk_unique_interface_side_a(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
return validate_iptrunk_unique_interface(side_a_ae_members)
@validator("side_a_ae_members", allow_reuse=True)
def validate_interface_name_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
def validate_side_a_ae_members(cls, side_a_ae_members: list[LAGMember]) -> list[LAGMember]:
validate_iptrunk_unique_interface(side_a_ae_members)
vendor = get_router_vendor(router_a)
return validate_interface_name_list(side_a_ae_members, vendor)
validate_interface_name_list(side_a_ae_members, vendor)
return side_a_ae_members
user_input_side_a = yield CreateIptrunkSideAForm
# Remove the selected router for side A, to prevent any loops
......@@ -153,13 +156,11 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
side_b_ae_members: ae_members_side_b # type: ignore[valid-type]
@validator("side_b_ae_members", allow_reuse=True)
def validate_iptrunk_unique_interface_side_b(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]:
return validate_iptrunk_unique_interface(side_b_ae_members)
@validator("side_b_ae_members", allow_reuse=True)
def validate_interface_name_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]:
def validate_side_b_ae_members(cls, side_b_ae_members: list[LAGMember]) -> list[LAGMember]:
validate_iptrunk_unique_interface(side_b_ae_members)
vendor = get_router_vendor(router_b)
return validate_interface_name_list(side_b_ae_members, vendor)
validate_interface_name_list(side_b_ae_members, vendor)
return side_b_ae_members
user_input_side_b = yield CreateIptrunkSideBForm
......@@ -184,7 +185,7 @@ def create_subscription(product: UUIDstr, customer: str) -> State:
@step("Get information from IPAM")
def get_info_from_ipam(subscription: IptrunkProvisioning) -> State:
def get_info_from_ipam(subscription: IptrunkInactive) -> State:
"""Allocate IP resources in :term:`IPAM`."""
subscription.iptrunk.iptrunk_ipv4_network = infoblox.allocate_v4_network(
"TRUNK",
......@@ -242,14 +243,13 @@ def initialize_subscription(
)
side_names = sorted([side_a.router_site.site_name, side_b.router_site.site_name])
subscription.description = f"IP trunk {side_names[0]} {side_names[1]}, geant_s_sid:{geant_s_sid}"
subscription = IptrunkProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@step("Provision IP trunk interface [DRY RUN]")
def provision_ip_trunk_iface_dry(
subscription: IptrunkProvisioning,
subscription: IptrunkInactive,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
......@@ -277,7 +277,7 @@ def provision_ip_trunk_iface_dry(
@step("Provision IP trunk interface [FOR REAL]")
def provision_ip_trunk_iface_real(
subscription: IptrunkProvisioning,
subscription: IptrunkInactive,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
......@@ -305,7 +305,7 @@ def provision_ip_trunk_iface_real(
@step("Check IP connectivity of the trunk")
def check_ip_trunk_connectivity(
subscription: IptrunkProvisioning,
subscription: IptrunkInactive,
callback_route: str,
) -> State:
"""Check successful connectivity across the new trunk."""
......@@ -323,7 +323,7 @@ def check_ip_trunk_connectivity(
@step("Provision IP trunk ISIS interface [DRY RUN]")
def provision_ip_trunk_isis_iface_dry(
subscription: IptrunkProvisioning,
subscription: IptrunkInactive,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
......@@ -351,7 +351,7 @@ def provision_ip_trunk_isis_iface_dry(
@step("Provision IP trunk ISIS interface [FOR REAL]")
def provision_ip_trunk_isis_iface_real(
subscription: IptrunkProvisioning,
subscription: IptrunkInactive,
callback_route: str,
process_id: UUIDstr,
tt_number: str,
......@@ -379,7 +379,7 @@ def provision_ip_trunk_isis_iface_real(
@step("Check ISIS adjacency")
def check_ip_trunk_isis(
subscription: IptrunkProvisioning,
subscription: IptrunkInactive,
callback_route: str,
) -> State:
"""Run an Ansible playbook to confirm :term:`ISIS` adjacency."""
......@@ -396,7 +396,7 @@ def check_ip_trunk_isis(
@step("NextBox integration")
def reserve_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State:
def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State:
"""Create the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
nbclient = NetboxClient()
for trunk_side in subscription.iptrunk.iptrunk_sides:
......@@ -428,22 +428,25 @@ def reserve_interfaces_in_netbox(subscription: IptrunkProvisioning) -> State:
}
def _allocate_interfaces_in_netbox(iptrunk_side: IptrunkSideBlockProvisioning) -> None:
def _allocate_interfaces_in_netbox(iptrunk_side: IptrunkSideBlockInactive) -> None:
for interface in iptrunk_side.iptrunk_side_ae_members:
NetboxClient().allocate_interface(
device_name=iptrunk_side.iptrunk_side_node.router_fqdn,
iface_name=interface.interface_name,
)
fqdn = iptrunk_side.iptrunk_side_node.router_fqdn
iface_name = interface.interface_name
if not fqdn or not iface_name:
msg = f"FQDN and/or interface name missing in subscription {interface.owner_subscription_id}"
raise ValueError(msg)
NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name)
@step("Allocate interfaces in Netbox for side A")
def netbox_allocate_side_a_interfaces(subscription: IptrunkProvisioning) -> None:
def netbox_allocate_side_a_interfaces(subscription: IptrunkInactive) -> None:
"""Allocate the :term:`LAG` interfaces for the Nokia router on side A."""
_allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[0])
@step("Allocate interfaces in Netbox for side B")
def netbox_allocate_side_b_interfaces(subscription: IptrunkProvisioning) -> None:
def netbox_allocate_side_b_interfaces(subscription: IptrunkInactive) -> None:
"""Allocate the :term:`LAG` interfaces for the Nokia router on side B."""
_allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[1])
......@@ -484,7 +487,7 @@ def create_iptrunk() -> StepList:
>> pp_interaction(check_ip_trunk_isis)
>> side_a_is_nokia(netbox_allocate_side_a_interfaces)
>> side_b_is_nokia(netbox_allocate_side_b_interfaces)
>> set_status(SubscriptionLifecycle.ACTIVE)
>> set_status(SubscriptionLifecycle.PROVISIONING)
>> resync
>> done
)
......@@ -7,9 +7,11 @@ from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import validator
from gso.products.product_types.iptrunk import Iptrunk
from gso.services.provisioning_proxy import execute_playbook, pp_interaction
from gso.utils.helpers import validate_tt_number
def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
......@@ -23,6 +25,10 @@ def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
)
tt_number: str
@validator("tt_number", allow_reuse=True)
def validate_tt_number(cls, tt_number: str) -> str:
return validate_tt_number(tt_number)
user_input = yield DeployTWAMPForm
return user_input.dict()
......
......@@ -39,6 +39,7 @@ from gso.utils.helpers import (
available_lags_choices,
get_router_vendor,
validate_interface_name_list,
validate_tt_number,
)
from gso.utils.workflow_steps import set_isis_to_90000
......@@ -71,6 +72,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
migrate_to_different_site: bool = False
restore_isis_metric: bool = True
@validator("tt_number", allow_reuse=True, pre=True, always=True)
def validate_tt_number(cls, tt_number: str) -> str:
return validate_tt_number(tt_number)
migrate_form_input = yield IPTrunkMigrateForm
current_routers = [
......
......@@ -32,6 +32,7 @@ from gso.utils.helpers import (
get_router_vendor,
validate_interface_name_list,
validate_iptrunk_unique_interface,
validate_tt_number,
)
......@@ -91,6 +92,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
iptrunk_ipv4_network: ipaddress.IPv4Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv4_network)
iptrunk_ipv6_network: ipaddress.IPv6Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv6_network)
@validator("tt_number", allow_reuse=True)
def validate_tt_number(cls, tt_number: str) -> str:
return validate_tt_number(tt_number)
initial_user_input = yield ModifyIptrunkForm
ae_members_side_a = initialize_ae_members(subscription, initial_user_input.dict(), 0)
......
......@@ -16,6 +16,7 @@ from orchestrator.workflows.steps import (
unsync,
)
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import validator
from gso.products.product_blocks.iptrunk import IptrunkSideBlock
from gso.products.product_blocks.router import RouterVendor
......@@ -23,7 +24,7 @@ from gso.products.product_types.iptrunk import Iptrunk
from gso.services import infoblox
from gso.services.netbox_client import NetboxClient
from gso.services.provisioning_proxy import execute_playbook, pp_interaction
from gso.utils.helpers import get_router_vendor
from gso.utils.helpers import get_router_vendor, validate_tt_number
from gso.utils.workflow_steps import set_isis_to_90000
......@@ -40,6 +41,10 @@ def initial_input_form_generator() -> FormGenerator:
clean_up_ipam: bool = True
clean_up_netbox: bool = True
@validator("tt_number", allow_reuse=True)
def validate_tt_number(cls, tt_number: str) -> str:
return validate_tt_number(tt_number)
user_input = yield TerminateForm
return user_input.dict()
......
......@@ -2,11 +2,12 @@
from typing import Any
from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice
from orchestrator.forms.validators import Choice, Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflow import StepList, conditional, done, init, inputstep, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import validator
......@@ -18,7 +19,7 @@ from gso.products.product_blocks.router import (
RouterVendor,
generate_fqdn,
)
from gso.products.product_types.router import RouterInactive, RouterProvisioning
from gso.products.product_types.router import RouterInactive
from gso.products.product_types.site import Site
from gso.services import infoblox, subscriptions
from gso.services.crm import get_customer_by_name
......@@ -106,15 +107,16 @@ def initialize_subscription(
subscription.router.vendor = vendor
subscription.description = f"Router {fqdn}"
subscription = RouterProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@step("Allocate loopback interfaces in IPAM")
def ipam_allocate_loopback(subscription: RouterProvisioning) -> State:
def ipam_allocate_loopback(subscription: RouterInactive) -> State:
"""Allocate :term:`IPAM` resources for the loopback interface."""
fqdn = subscription.router.router_fqdn
if not fqdn:
msg = f"Router fqdn for subscription id {subscription.subscription_id} is missing!"
raise ValueError(msg)
loopback_v4, loopback_v6 = infoblox.allocate_host(f"lo0.{fqdn}", "LO", [fqdn], str(subscription.subscription_id))
subscription.router.router_lo_ipv4_address = loopback_v4
......@@ -125,17 +127,21 @@ def ipam_allocate_loopback(subscription: RouterProvisioning) -> State:
@step("Create NetBox Device")
def create_netbox_device(subscription: RouterProvisioning) -> State:
def create_netbox_device(subscription: RouterInactive) -> State:
"""Create a new NOKIA device in Netbox."""
NetboxClient().create_device(
subscription.router.router_fqdn,
str(subscription.router.router_site.site_tier), # type: ignore[union-attr]
)
fqdn = subscription.router.router_fqdn
site_tier = subscription.router.router_site.site_tier if subscription.router.router_site else None
if not fqdn or not site_tier:
msg = f"FQDN and/or Site tier missing in router subscription {subscription.subscription_id}!"
raise ValueError(msg)
NetboxClient().create_device(fqdn, site_tier)
return {"subscription": subscription}
@step("Verify IPAM resources for loopback interface")
def verify_ipam_loopback(subscription: RouterProvisioning) -> State:
def verify_ipam_loopback(subscription: RouterInactive) -> State:
"""Validate the :term:`IPAM` resources for the loopback interface."""
host_record = infoblox.find_host_by_fqdn(f"lo0.{subscription.router.router_fqdn}")
if not host_record or str(subscription.subscription_id) not in host_record.comment:
......@@ -144,6 +150,63 @@ def verify_ipam_loopback(subscription: RouterProvisioning) -> State:
return {"subscription": subscription}
@inputstep("Prompt to reboot", assignee=Assignee.SYSTEM)
def prompt_reboot_router(subscription: RouterInactive) -> FormGenerator:
"""Wait for confirmation from an operator that the router has been rebooted."""
class RebootPrompt(FormPage):
class Config:
title = "Please reboot before continuing"
if subscription.router.router_site and subscription.router.router_site.site_ts_address:
info_label_1: Label = (
f"Base config has been deployed. Please log in via the console using https://" # type: ignore[assignment]
f"{subscription.router.router_site.site_ts_address}."
)
else:
info_label_1 = "Base config has been deployed. Please log in via the console." # type: ignore[assignment]
info_label_2: Label = "Reboot the router, and once it is up again, press submit to continue the workflow." # type: ignore[assignment]
yield RebootPrompt
return {}
@inputstep("Prompt to test the console", assignee=Assignee.SYSTEM)
def prompt_console_login() -> FormGenerator:
"""Wait for confirmation from an operator that the router can be logged into."""
class ConsolePrompt(FormPage):
class Config:
title = "Verify local authentication"
info_label_1: Label = (
"Verify that you are able to log in to the router via the console using the admin account." # type: ignore[assignment]
)
info_label_2: Label = "Once this is done, press submit to continue the workflow." # type: ignore[assignment]
yield ConsolePrompt
return {}
@inputstep("Prompt IMS insertion", assignee=Assignee.SYSTEM)
def prompt_insert_in_ims() -> FormGenerator:
"""Wait for confirmation from an operator that the router has been inserted in IMS."""
class IMSPrompt(FormPage):
class Config:
title = "Update IMS mediation server"
info_label_1: Label = "Insert the router into IMS." # type: ignore[assignment]
info_label_2: Label = "Once this is done, press submit to continue the workflow." # type: ignore[assignment]
yield IMSPrompt
return {}
@workflow(
"Create router",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
......@@ -169,9 +232,12 @@ def create_router() -> StepList:
>> pp_interaction(deploy_base_config_dry)
>> pp_interaction(deploy_base_config_real)
>> verify_ipam_loopback
>> prompt_reboot_router
>> prompt_console_login
>> prompt_insert_in_ims
>> router_is_nokia(create_netbox_device)
>> pp_interaction(run_checks_after_base_config)
>> set_status(SubscriptionLifecycle.ACTIVE)
>> set_status(SubscriptionLifecycle.PROVISIONING)
>> resync
>> done
)
......@@ -2,10 +2,12 @@
from typing import Any
from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflow import StepList, done, init, inputstep, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import root_validator
......@@ -188,6 +190,36 @@ def add_device_to_librenms(subscription: Router) -> State:
return {"librenms_device": librenms_result}
@inputstep("Prompt RADIUS insertion", assignee=Assignee.SYSTEM)
def prompt_insert_in_radius() -> FormGenerator:
"""Wait for confirmation from an operator that the router has been inserted in RADIUS."""
class RADIUSPrompt(FormPage):
class Config:
title = "Please update RADIUS before continuing"
info_label: Label = "Insert the router into RADIUS, and continue the workflow once this has been completed." # type: ignore[assignment]
yield RADIUSPrompt
return {}
@inputstep("Prompt RADIUS login", assignee=Assignee.SYSTEM)
def prompt_radius_login() -> FormGenerator:
"""Wait for confirmation from an operator that the router can be logged into using RADIUS."""
class RADIUSPrompt(FormPage):
class Config:
title = "Please check RADIUS before continuing"
info_label: Label = "Log in to the router using RADIUS, and continue the workflow when this was successful." # type: ignore[assignment]
yield RADIUSPrompt
return {}
@step("Update subscription model")
def update_subscription_model(subscription: Router) -> State:
"""Update the database model, such that it should not be reached via :term:`OOB` access anymore."""
......@@ -221,6 +253,8 @@ def update_ibgp_mesh() -> StepList:
>> pp_interaction(add_all_pe_to_p_real)
>> pp_interaction(check_ibgp_session)
>> add_device_to_librenms
>> prompt_insert_in_radius
>> prompt_radius_login
>> update_subscription_model
>> resync
>> done
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment