Skip to content
Snippets Groups Projects
Verified Commit 40bd8735 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Reformat all docstrings from Sphinx to Google style

parent 96171bb2
Branches
Tags
1 merge request!316Replace Sphinx with MkDocs
Showing
with 69 additions and 67 deletions
...@@ -24,7 +24,7 @@ from gso.utils.types.interfaces import LAGMember, LAGMemberList, PhysicalPortCap ...@@ -24,7 +24,7 @@ from gso.utils.types.interfaces import LAGMember, LAGMemberList, PhysicalPortCap
def initial_input_form_generator() -> FormGenerator: def initial_input_form_generator() -> FormGenerator:
"""Take all information passed to this workflow by the :term:`API` endpoint that was called.""" """Take all information passed to this workflow by the API endpoint that was called."""
class CreateIptrunkForm(FormPage): class CreateIptrunkForm(FormPage):
model_config = ConfigDict(title="Import Iptrunk") model_config = ConfigDict(title="Import Iptrunk")
...@@ -123,7 +123,7 @@ def update_ipam_stub_for_subscription( ...@@ -123,7 +123,7 @@ def update_ipam_stub_for_subscription(
iptrunk_ipv4_network: ipaddress.IPv4Network, iptrunk_ipv4_network: ipaddress.IPv4Network,
iptrunk_ipv6_network: ipaddress.IPv6Network, iptrunk_ipv6_network: ipaddress.IPv6Network,
) -> State: ) -> State:
"""Update :term:`IPAM` information in the subscription.""" """Update IPAM information in the subscription."""
subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network
subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
......
...@@ -434,7 +434,7 @@ def check_ip_trunk_connectivity(subscription: IptrunkInactive) -> LSOState: ...@@ -434,7 +434,7 @@ def check_ip_trunk_connectivity(subscription: IptrunkInactive) -> LSOState:
@step("[DRY RUN] Provision IP trunk ISIS interface") @step("[DRY RUN] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_dry(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState: def provision_ip_trunk_isis_iface_dry(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Perform a dry run of deploying :term:`ISIS` configuration.""" """Perform a dry run of deploying ISIS configuration."""
extra_vars = { extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)), "wfo_trunk_json": json.loads(json_dumps(subscription)),
"dry_run": True, "dry_run": True,
...@@ -460,7 +460,7 @@ def provision_ip_trunk_isis_iface_dry(subscription: IptrunkInactive, process_id: ...@@ -460,7 +460,7 @@ def provision_ip_trunk_isis_iface_dry(subscription: IptrunkInactive, process_id:
@step("[FOR REAL] Provision IP trunk ISIS interface") @step("[FOR REAL] Provision IP trunk ISIS interface")
def provision_ip_trunk_isis_iface_real(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState: def provision_ip_trunk_isis_iface_real(subscription: IptrunkInactive, process_id: UUIDstr, tt_number: str) -> LSOState:
"""Deploy :term:`ISIS` configuration on both sides.""" """Deploy ISIS configuration on both sides."""
extra_vars = { extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)), "wfo_trunk_json": json.loads(json_dumps(subscription)),
"dry_run": False, "dry_run": False,
...@@ -486,7 +486,7 @@ def provision_ip_trunk_isis_iface_real(subscription: IptrunkInactive, process_id ...@@ -486,7 +486,7 @@ def provision_ip_trunk_isis_iface_real(subscription: IptrunkInactive, process_id
@step("Check ISIS adjacency") @step("Check ISIS adjacency")
def check_ip_trunk_isis(subscription: IptrunkInactive) -> LSOState: def check_ip_trunk_isis(subscription: IptrunkInactive) -> LSOState:
"""Run an Ansible playbook to confirm :term:`ISIS` adjacency.""" """Run an Ansible playbook to confirm ISIS adjacency."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"} extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"}
return { return {
...@@ -498,7 +498,7 @@ def check_ip_trunk_isis(subscription: IptrunkInactive) -> LSOState: ...@@ -498,7 +498,7 @@ def check_ip_trunk_isis(subscription: IptrunkInactive) -> LSOState:
@step("Register DNS records for both sides of the trunk") @step("Register DNS records for both sides of the trunk")
def register_dns_records(subscription: IptrunkInactive) -> State: def register_dns_records(subscription: IptrunkInactive) -> State:
"""Register :term:`DNS` records for both sides of the newly created IPtrunk.""" """Register DNS records for both sides of the newly created IPtrunk."""
for index, side in enumerate(subscription.iptrunk.iptrunk_sides): for index, side in enumerate(subscription.iptrunk.iptrunk_sides):
fqdn = f"{side.iptrunk_side_ae_iface}-0.{side.iptrunk_side_node.router_fqdn}" fqdn = f"{side.iptrunk_side_ae_iface}-0.{side.iptrunk_side_node.router_fqdn}"
if not (subscription.iptrunk.iptrunk_ipv4_network and subscription.iptrunk.iptrunk_ipv6_network): if not (subscription.iptrunk.iptrunk_ipv4_network and subscription.iptrunk.iptrunk_ipv6_network):
...@@ -514,11 +514,11 @@ def register_dns_records(subscription: IptrunkInactive) -> State: ...@@ -514,11 +514,11 @@ def register_dns_records(subscription: IptrunkInactive) -> State:
@step("NextBox integration") @step("NextBox integration")
def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State: def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State:
"""Create the :term:`LAG` interfaces in NetBox and attach the LAG interfaces to the physical interfaces.""" """Create the LAG interfaces in NetBox and attach the LAG interfaces to the physical interfaces."""
nbclient = NetboxClient() nbclient = NetboxClient()
for trunk_side in subscription.iptrunk.iptrunk_sides: for trunk_side in subscription.iptrunk.iptrunk_sides:
if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == Vendor.NOKIA: if get_router_vendor(trunk_side.iptrunk_side_node.owner_subscription_id) == Vendor.NOKIA:
# Create :term:`LAG` interfaces # Create LAG interfaces
lag_interface: Interfaces = nbclient.create_interface( lag_interface: Interfaces = nbclient.create_interface(
iface_name=trunk_side.iptrunk_side_ae_iface, # type: ignore[arg-type] iface_name=trunk_side.iptrunk_side_ae_iface, # type: ignore[arg-type]
interface_type="lag", interface_type="lag",
...@@ -526,7 +526,7 @@ def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State: ...@@ -526,7 +526,7 @@ def reserve_interfaces_in_netbox(subscription: IptrunkInactive) -> State:
description=str(subscription.subscription_id), description=str(subscription.subscription_id),
enabled=True, enabled=True,
) )
# Attach physical interfaces to :term:`LAG` # Attach physical interfaces to LAG
# Update interface description to subscription ID # Update interface description to subscription ID
# Reserve interfaces # Reserve interfaces
for interface in trunk_side.iptrunk_side_ae_members: for interface in trunk_side.iptrunk_side_ae_members:
...@@ -558,13 +558,13 @@ def _allocate_interfaces_in_netbox(iptrunk_side: IptrunkSideBlockInactive) -> No ...@@ -558,13 +558,13 @@ def _allocate_interfaces_in_netbox(iptrunk_side: IptrunkSideBlockInactive) -> No
@step("Allocate interfaces in Netbox for side A") @step("Allocate interfaces in Netbox for side A")
def netbox_allocate_side_a_interfaces(subscription: IptrunkInactive) -> None: def netbox_allocate_side_a_interfaces(subscription: IptrunkInactive) -> None:
"""Allocate the :term:`LAG` interfaces for the Nokia router on side A.""" """Allocate the LAG interfaces for the Nokia router on side A."""
_allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[0]) _allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[0])
@step("Allocate interfaces in Netbox for side B") @step("Allocate interfaces in Netbox for side B")
def netbox_allocate_side_b_interfaces(subscription: IptrunkInactive) -> None: def netbox_allocate_side_b_interfaces(subscription: IptrunkInactive) -> None:
"""Allocate the :term:`LAG` interfaces for the Nokia router on side B.""" """Allocate the LAG interfaces for the Nokia router on side B."""
_allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[1]) _allocate_interfaces_in_netbox(subscription.iptrunk.iptrunk_sides[1])
...@@ -597,8 +597,8 @@ def create_iptrunk() -> StepList: ...@@ -597,8 +597,8 @@ def create_iptrunk() -> StepList:
* Reserve interfaces in Netbox * Reserve interfaces in Netbox
* Deploy configuration on the two sides of the trunk, first as a dry run * Deploy configuration on the two sides of the trunk, first as a dry run
* Check connectivity on the new trunk * Check connectivity on the new trunk
* Deploy the new :term:`ISIS` metric on the trunk, first as a dry run * Deploy the new ISIS metric on the trunk, first as a dry run
* Verify :term:`ISIS` adjacency * Verify ISIS adjacency
* Allocate the interfaces in Netbox * Allocate the interfaces in Netbox
* Set the subscription to active in the database * Set the subscription to active in the database
""" """
......
...@@ -114,9 +114,9 @@ def check_twamp_status(subscription: Iptrunk) -> LSOState: ...@@ -114,9 +114,9 @@ def check_twamp_status(subscription: Iptrunk) -> LSOState:
target=Target.MODIFY, target=Target.MODIFY,
) )
def deploy_twamp() -> StepList: def deploy_twamp() -> StepList:
"""Deploy a :term:`TWAMP` session on an IP trunk. """Deploy a TWAMP session on an IP trunk.
* Run the :term:`TWAMP` playbook, including an initial dry run * Run the TWAMP playbook, including an initial dry run
""" """
return ( return (
begin begin
......
...@@ -175,7 +175,7 @@ def netbox_reserve_interfaces( ...@@ -175,7 +175,7 @@ def netbox_reserve_interfaces(
description=str(subscription.subscription_id), description=str(subscription.subscription_id),
enabled=True, enabled=True,
) )
# Attach physical interfaces to :term:`LAG` # Attach physical interfaces to LAG
# Reserve interfaces # Reserve interfaces
for interface in new_lag_member_interfaces: for interface in new_lag_member_interfaces:
nbclient.attach_interface_to_lag( nbclient.attach_interface_to_lag(
...@@ -254,7 +254,7 @@ def check_ip_trunk_optical_levels_post( ...@@ -254,7 +254,7 @@ def check_ip_trunk_optical_levels_post(
def check_ip_trunk_lldp( def check_ip_trunk_lldp(
subscription: Iptrunk, new_node: Router, new_lag_member_interfaces: list[dict], replace_index: int subscription: Iptrunk, new_node: Router, new_lag_member_interfaces: list[dict], replace_index: int
) -> LSOState: ) -> LSOState:
"""Check :term:`LLDP` on the new trunk endpoints.""" """Check LLDP on the new trunk endpoints."""
extra_vars = { extra_vars = {
"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
"new_node": json.loads(json_dumps(new_node)), "new_node": json.loads(json_dumps(new_node)),
...@@ -491,7 +491,7 @@ def update_remaining_side_bfd_real( ...@@ -491,7 +491,7 @@ def update_remaining_side_bfd_real(
@step("Check BFD session over trunk") @step("Check BFD session over trunk")
def check_ip_trunk_bfd(subscription: Iptrunk, new_node: Router, replace_index: int) -> LSOState: def check_ip_trunk_bfd(subscription: Iptrunk, new_node: Router, replace_index: int) -> LSOState:
"""Check :term:`BFD` session across the new trunk.""" """Check BFD session across the new trunk."""
extra_vars = { extra_vars = {
"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
"new_node": json.loads(json_dumps(new_node)), "new_node": json.loads(json_dumps(new_node)),
...@@ -553,7 +553,7 @@ def deploy_new_isis( ...@@ -553,7 +553,7 @@ def deploy_new_isis(
process_id: UUIDstr, process_id: UUIDstr,
tt_number: str, tt_number: str,
) -> LSOState: ) -> LSOState:
"""Deploy :term:`ISIS` configuration.""" """Deploy ISIS configuration."""
extra_vars = { extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)), "wfo_trunk_json": json.loads(json_dumps(subscription)),
"new_node": json.loads(json_dumps(new_node)), "new_node": json.loads(json_dumps(new_node)),
...@@ -584,7 +584,7 @@ def deploy_new_isis( ...@@ -584,7 +584,7 @@ def deploy_new_isis(
@step("Check ISIS adjacency") @step("Check ISIS adjacency")
def check_ip_trunk_isis(subscription: Iptrunk, replace_index: int) -> LSOState: def check_ip_trunk_isis(subscription: Iptrunk, replace_index: int) -> LSOState:
"""Run an Ansible playbook to confirm :term:`ISIS` adjacency.""" """Run an Ansible playbook to confirm ISIS adjacency."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"} extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "isis"}
return { return {
...@@ -602,7 +602,7 @@ def check_ip_trunk_isis(subscription: Iptrunk, replace_index: int) -> LSOState: ...@@ -602,7 +602,7 @@ def check_ip_trunk_isis(subscription: Iptrunk, replace_index: int) -> LSOState:
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM) @inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_continue_restore_isis() -> FormGenerator: def confirm_continue_restore_isis() -> FormGenerator:
"""Wait for an operator to confirm that the old :term:`ISIS` metric should be restored.""" """Wait for an operator to confirm that the old ISIS metric should be restored."""
class ProvisioningResultPage(FormPage): class ProvisioningResultPage(FormPage):
model_config = ConfigDict(title="Please confirm before continuing") model_config = ConfigDict(title="Please confirm before continuing")
...@@ -621,7 +621,7 @@ def restore_isis_metric( ...@@ -621,7 +621,7 @@ def restore_isis_metric(
tt_number: str, tt_number: str,
old_isis_metric: int, old_isis_metric: int,
) -> LSOState: ) -> LSOState:
"""Restore the :term:`ISIS` metric to its original value.""" """Restore the ISIS metric to its original value."""
subscription.iptrunk.iptrunk_isis_metric = old_isis_metric subscription.iptrunk.iptrunk_isis_metric = old_isis_metric
extra_vars = { extra_vars = {
"wfo_trunk_json": json.loads(json_dumps(subscription)), "wfo_trunk_json": json.loads(json_dumps(subscription)),
...@@ -727,9 +727,9 @@ def delete_old_config_real( ...@@ -727,9 +727,9 @@ def delete_old_config_real(
@step("Update IP records in IPAM") @step("Update IP records in IPAM")
def update_ipam(subscription: Iptrunk, replace_index: int, new_node: Router, new_lag_interface: str) -> State: def update_ipam(subscription: Iptrunk, replace_index: int, new_node: Router, new_lag_interface: str) -> State:
"""Update :term:`IPAM` resources. """Update IPAM resources.
Move the :term:`DNS` record pointing to the old side of the trunk, to the new side. Move the DNS record pointing to the old side of the trunk, to the new side.
""" """
v4_addr = subscription.iptrunk.iptrunk_ipv4_network[replace_index] v4_addr = subscription.iptrunk.iptrunk_ipv4_network[replace_index]
# IPv6 networks start with an unused address we need to skip past. # IPv6 networks start with an unused address we need to skip past.
...@@ -778,7 +778,7 @@ def update_subscription_model( ...@@ -778,7 +778,7 @@ def update_subscription_model(
@step("Netbox: Remove old LAG interface") @step("Netbox: Remove old LAG interface")
def netbox_remove_old_interfaces(old_side_data: dict) -> State: def netbox_remove_old_interfaces(old_side_data: dict) -> State:
"""Remove the old :term:`LAG` interface from Netbox, only relevant if the old side is a Nokia router.""" """Remove the old LAG interface from Netbox, only relevant if the old side is a Nokia router."""
nbclient = NetboxClient() nbclient = NetboxClient()
for iface in old_side_data["iptrunk_side_ae_members"]: for iface in old_side_data["iptrunk_side_ae_members"]:
...@@ -797,7 +797,7 @@ def netbox_remove_old_interfaces(old_side_data: dict) -> State: ...@@ -797,7 +797,7 @@ def netbox_remove_old_interfaces(old_side_data: dict) -> State:
@step("Netbox: Allocate new LAG member interfaces") @step("Netbox: Allocate new LAG member interfaces")
def netbox_allocate_new_interfaces(subscription: Iptrunk, replace_index: int) -> State: def netbox_allocate_new_interfaces(subscription: Iptrunk, replace_index: int) -> State:
"""Allocate the new :term:`LAG` interface in Netbox. Only relevant if the new router is a Nokia.""" """Allocate the new LAG interface in Netbox. Only relevant if the new router is a Nokia."""
nbclient = NetboxClient() nbclient = NetboxClient()
new_side = subscription.iptrunk.iptrunk_sides[replace_index] new_side = subscription.iptrunk.iptrunk_sides[replace_index]
...@@ -835,15 +835,15 @@ def migrate_iptrunk() -> StepList: ...@@ -835,15 +835,15 @@ def migrate_iptrunk() -> StepList:
"""Migrate an IP trunk. """Migrate an IP trunk.
* Reserve new interfaces in Netbox * Reserve new interfaces in Netbox
* Set the :term:`ISIS` metric of the current trunk to an arbitrarily high value to drain all traffic * Set the ISIS metric of the current trunk to an arbitrarily high value to drain all traffic
* Disable - but do not delete - the old configuration on the routers, first as a dry run * Disable - but do not delete - the old configuration on the routers, first as a dry run
* Deploy the new configuration on the routers, first as a dry run * Deploy the new configuration on the routers, first as a dry run
* Wait for operator confirmation that the physical fiber has been moved before continuing * Wait for operator confirmation that the physical fiber has been moved before continuing
* Deploy a new :term:`ISIS` interface between routers A and C * Deploy a new ISIS interface between routers A and C
* Wait for operator confirmation that :term:`ISIS` is behaving as expected * Wait for operator confirmation that ISIS is behaving as expected
* Restore the old :term:`ISIS` metric on the new trunk * Restore the old ISIS metric on the new trunk
* Delete the old configuration from the routers, first as a dry run * Delete the old configuration from the routers, first as a dry run
* Reflect the changes made in :term:`IPAM` * Reflect the changes made in IPAM
* Update the subscription model in the database * Update the subscription model in the database
* Update the reserved interfaces in Netbox * Update the reserved interfaces in Netbox
""" """
......
...@@ -381,7 +381,7 @@ def _netbox_update_interfaces( ...@@ -381,7 +381,7 @@ def _netbox_update_interfaces(
for member in removed_ae_members: for member in removed_ae_members:
nbclient.free_interface(side_block.iptrunk_side_node.router_fqdn, member["interface_name"]) nbclient.free_interface(side_block.iptrunk_side_node.router_fqdn, member["interface_name"])
# Attach physical interfaces to :term:`LAG` # Attach physical interfaces to LAG
# Update interface description to subscription ID # Update interface description to subscription ID
# Reserve interfaces # Reserve interfaces
for interface in side_block.iptrunk_side_ae_members: for interface in side_block.iptrunk_side_ae_members:
...@@ -448,18 +448,18 @@ def _netbox_allocate_interfaces(side_block: IptrunkSideBlock, previous_ae_member ...@@ -448,18 +448,18 @@ def _netbox_allocate_interfaces(side_block: IptrunkSideBlock, previous_ae_member
@step("Netbox: Allocate side A interfaces") @step("Netbox: Allocate side A interfaces")
def allocate_interfaces_in_netbox_side_a(subscription: Iptrunk, previous_ae_members: list[list[dict]]) -> None: def allocate_interfaces_in_netbox_side_a(subscription: Iptrunk, previous_ae_members: list[list[dict]]) -> None:
"""Allocate the :term:`LAG` interfaces on side A in Netbox. """Allocate the LAG interfaces on side A in Netbox.
Attach the :term:`LAG` interface to the physical interface detach old one from the :term:`LAG`. Attach the LAG interface to the physical interface detach old one from the LAG.
""" """
_netbox_allocate_interfaces(subscription.iptrunk.iptrunk_sides[0], previous_ae_members[0]) _netbox_allocate_interfaces(subscription.iptrunk.iptrunk_sides[0], previous_ae_members[0])
@step("Netbox: Allocate side B interfaces") @step("Netbox: Allocate side B interfaces")
def allocate_interfaces_in_netbox_side_b(subscription: Iptrunk, previous_ae_members: list[list[dict]]) -> None: def allocate_interfaces_in_netbox_side_b(subscription: Iptrunk, previous_ae_members: list[list[dict]]) -> None:
"""Allocate the :term:`LAG` interface on side B in Netbox. """Allocate the LAG interface on side B in Netbox.
Attach the :term:`LAG` interface to the physical interface detach old one from the :term:`LAG`. Attach the LAG interface to the physical interface detach old one from the LAG.
""" """
_netbox_allocate_interfaces(subscription.iptrunk.iptrunk_sides[1], previous_ae_members[1]) _netbox_allocate_interfaces(subscription.iptrunk.iptrunk_sides[1], previous_ae_members[1])
......
...@@ -38,9 +38,10 @@ def validate_router_config(subscription: Iptrunk) -> LSOState: ...@@ -38,9 +38,10 @@ def validate_router_config(subscription: Iptrunk) -> LSOState:
@step("Verify IPAM resources for LAG interfaces") @step("Verify IPAM resources for LAG interfaces")
def verify_ipam_records(subscription: Iptrunk) -> None: def verify_ipam_records(subscription: Iptrunk) -> None:
"""Validate the :term:`IPAM` resources for the :term:`LAG` interfaces. """Validate the IPAM resources for the LAG interfaces.
Raises an :class:`orchestrator.utils.errors.ProcessFailureError` if :term:`IPAM` is configured incorrectly. Raises:
ProcessFailureError: if IPAM is configured incorrectly.
""" """
ipam_errors = [] ipam_errors = []
ipam_v4_network = infoblox.find_network_by_cidr(subscription.iptrunk.iptrunk_ipv4_network) ipam_v4_network = infoblox.find_network_by_cidr(subscription.iptrunk.iptrunk_ipv4_network)
...@@ -158,7 +159,7 @@ def verify_iptrunk_config(subscription: Iptrunk) -> LSOState: ...@@ -158,7 +159,7 @@ def verify_iptrunk_config(subscription: Iptrunk) -> LSOState:
@step("Check ISIS configuration") @step("Check ISIS configuration")
def check_ip_trunk_isis(subscription: Iptrunk) -> LSOState: def check_ip_trunk_isis(subscription: Iptrunk) -> LSOState:
"""Run an Ansible playbook to check for any :term:`ISIS` configuration drift.""" """Run an Ansible playbook to check for any ISIS configuration drift."""
return { return {
"playbook_name": "gap_ansible/playbooks/iptrunks.yaml", "playbook_name": "gap_ansible/playbooks/iptrunks.yaml",
"inventory": { "inventory": {
...@@ -207,10 +208,10 @@ def verify_twamp_config(subscription: Iptrunk) -> LSOState: ...@@ -207,10 +208,10 @@ def verify_twamp_config(subscription: Iptrunk) -> LSOState:
def validate_iptrunk() -> StepList: def validate_iptrunk() -> StepList:
"""Validate an existing, active IP Trunk subscription. """Validate an existing, active IP Trunk subscription.
* Verify that the :term:`LAG` interfaces are correctly configured in :term:`IPAM`. * Verify that the LAG interfaces are correctly configured in IPAM.
* Check correct configuration of interfaces in NetBox. * Check correct configuration of interfaces in NetBox.
* Verify the configuration on both sides of the trunk is intact. * Verify the configuration on both sides of the trunk is intact.
* Check the :term:`ISIS` metric of the trunk. * Check the ISIS metric of the trunk.
* Verify that TWAMP configuration is correct. * Verify that TWAMP configuration is correct.
If a trunk has a Juniper router on both sides, it is considered legacy and does not require validation. If a trunk has a Juniper router on both sides, it is considered legacy and does not require validation.
......
...@@ -28,7 +28,7 @@ from gso.utils.types.virtual_identifiers import VC_ID, VLAN_ID ...@@ -28,7 +28,7 @@ from gso.utils.types.virtual_identifiers import VC_ID, VLAN_ID
def initial_input_form_generator() -> FormGenerator: def initial_input_form_generator() -> FormGenerator:
"""Generate a form that can be pre-filled using an :term:`API` endpoint.""" """Generate a form that can be pre-filled using an API endpoint."""
class ServiceBindingPortInput(BaseModel): class ServiceBindingPortInput(BaseModel):
edge_port: UUIDstr edge_port: UUIDstr
...@@ -52,7 +52,7 @@ def initial_input_form_generator() -> FormGenerator: ...@@ -52,7 +52,7 @@ def initial_input_form_generator() -> FormGenerator:
@model_validator(mode="after") @model_validator(mode="after")
def tagged_layer_2_circuit_has_vlan_bounds(self) -> Self: def tagged_layer_2_circuit_has_vlan_bounds(self) -> Self:
"""If a Layer 2 Circuit is tagged, it must have a :term:`VLAN` range set.""" """If a Layer 2 Circuit is tagged, it must have a VLAN range set."""
if self.layer_2_circuit_type == Layer2CircuitType.TAGGED and ( if self.layer_2_circuit_type == Layer2CircuitType.TAGGED and (
self.vlan_range_lower_bound is None or self.vlan_range_upper_bound is None self.vlan_range_lower_bound is None or self.vlan_range_upper_bound is None
): ):
......
...@@ -26,7 +26,7 @@ from gso.utils.types.virtual_identifiers import VLAN_ID ...@@ -26,7 +26,7 @@ from gso.utils.types.virtual_identifiers import VLAN_ID
def initial_input_form_generator() -> FormGenerator: def initial_input_form_generator() -> FormGenerator:
"""Take all information passed to this workflow by the :term:`API` endpoint that was called.""" """Take all information passed to this workflow by the API endpoint that was called."""
class BFDSettingsModel(BaseModel): class BFDSettingsModel(BaseModel):
bfd_enabled: bool = False bfd_enabled: bool = False
......
...@@ -247,7 +247,7 @@ def check_sbp_functionality(subscription: dict[str, Any], edge_port_fqdn_list: l ...@@ -247,7 +247,7 @@ def check_sbp_functionality(subscription: dict[str, Any], edge_port_fqdn_list: l
def deploy_bgp_peers_dry( def deploy_bgp_peers_dry(
subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr, partner_name: str subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr, partner_name: str
) -> LSOState: ) -> LSOState:
"""Perform a dry run of deploying :term:`BGP` peers.""" """Perform a dry run of deploying BGP peers."""
extra_vars = { extra_vars = {
"subscription": subscription, "subscription": subscription,
"partner_name": partner_name, "partner_name": partner_name,
...@@ -269,7 +269,7 @@ def deploy_bgp_peers_dry( ...@@ -269,7 +269,7 @@ def deploy_bgp_peers_dry(
def deploy_bgp_peers_real( def deploy_bgp_peers_real(
subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr, partner_name: str subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr, partner_name: str
) -> LSOState: ) -> LSOState:
"""Deploy :term:`BGP` peers.""" """Deploy BGP peers."""
extra_vars = { extra_vars = {
"subscription": subscription, "subscription": subscription,
"partner_name": partner_name, "partner_name": partner_name,
...@@ -289,7 +289,7 @@ def deploy_bgp_peers_real( ...@@ -289,7 +289,7 @@ def deploy_bgp_peers_real(
@step("Check BGP peers") @step("Check BGP peers")
def check_bgp_peers(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState: def check_bgp_peers(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
"""Check correct deployment of :term:`BGP` peers.""" """Check correct deployment of BGP peers."""
extra_vars = {"subscription": subscription, "verb": "check", "object": "bgp"} extra_vars = {"subscription": subscription, "verb": "check", "object": "bgp"}
return { return {
...@@ -301,7 +301,7 @@ def check_bgp_peers(subscription: dict[str, Any], edge_port_fqdn_list: list[str] ...@@ -301,7 +301,7 @@ def check_bgp_peers(subscription: dict[str, Any], edge_port_fqdn_list: list[str]
@step("Update Infoblox") @step("Update Infoblox")
def update_dns_records(subscription: L3CoreService) -> State: def update_dns_records(subscription: L3CoreService) -> State:
"""Update :term:`DNS` records in Infoblox.""" """Update DNS records in Infoblox."""
# TODO: implement # TODO: implement
return {"subscription": subscription} return {"subscription": subscription}
...@@ -316,8 +316,8 @@ def create_l3_core_service() -> StepList: ...@@ -316,8 +316,8 @@ def create_l3_core_service() -> StepList:
* Create subscription object in the service database * Create subscription object in the service database
* Deploy service binding ports * Deploy service binding ports
* Deploy :term:`BGP` peers * Deploy BGP peers
* Update :term:`DNS` records * Update DNS records
* Set the subscription in a provisioning state in the database * Set the subscription in a provisioning state in the database
""" """
return ( return (
......
...@@ -246,7 +246,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -246,7 +246,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
@step("Clean up removed Edge Ports") @step("Clean up removed Edge Ports")
def remove_old_sbp_blocks(subscription: L3CoreService, removed_access_ports: list[UUIDstr]) -> State: def remove_old_sbp_blocks(subscription: L3CoreService, removed_access_ports: list[UUIDstr]) -> State:
"""Remove old :term:`SBP` product blocks from the GÉANT IP subscription.""" """Remove old SBP product blocks from the GÉANT IP subscription."""
subscription.l3_core_service.ap_list = [ subscription.l3_core_service.ap_list = [
ap ap
for ap in subscription.l3_core_service.ap_list for ap in subscription.l3_core_service.ap_list
...@@ -291,7 +291,7 @@ def modify_existing_sbp_blocks(subscription: L3CoreService, modified_sbp_list: l ...@@ -291,7 +291,7 @@ def modify_existing_sbp_blocks(subscription: L3CoreService, modified_sbp_list: l
@step("Instantiate new Service Binding Ports") @step("Instantiate new Service Binding Ports")
def create_new_sbp_blocks(subscription: L3CoreService, added_service_binding_ports: list[dict[str, Any]]) -> State: def create_new_sbp_blocks(subscription: L3CoreService, added_service_binding_ports: list[dict[str, Any]]) -> State:
"""Add new two :term:`SBP` to the L3 Core Service subscription.""" """Add new two SBP to the L3 Core Service subscription."""
for sbp_input in added_service_binding_ports: for sbp_input in added_service_binding_ports:
edge_port = EdgePort.from_subscription(sbp_input["edge_port_id"]) edge_port = EdgePort.from_subscription(sbp_input["edge_port_id"])
bgp_session_list = [ bgp_session_list = [
......
"""Import an existing :term:`LAN` Switch Interconnect into the subscription database.""" """Import an existing LAN Switch Interconnect into the subscription database."""
from uuid import uuid4 from uuid import uuid4
...@@ -88,7 +88,7 @@ def initialize_subscription( ...@@ -88,7 +88,7 @@ def initialize_subscription(
"Create Imported LAN Switch Interconnect", initial_input_form=_initial_input_form_generator, target=Target.CREATE "Create Imported LAN Switch Interconnect", initial_input_form=_initial_input_form_generator, target=Target.CREATE
) )
def create_imported_lan_switch_interconnect() -> StepList: def create_imported_lan_switch_interconnect() -> StepList:
"""Create an imported :term:`LAN` Switch Interconnect without provisioning it.""" """Create an imported LAN Switch Interconnect without provisioning it."""
return ( return (
begin begin
>> create_subscription >> create_subscription
......
"""Import an existing :term:`LAN` Switch Interconnect into the service database.""" """Import an existing LAN Switch Interconnect into the service database."""
from orchestrator import step, workflow from orchestrator import step, workflow
from orchestrator.targets import Target from orchestrator.targets import Target
......
...@@ -37,7 +37,7 @@ def _input_form_generator(subscription_id: UUIDstr) -> FormGenerator: ...@@ -37,7 +37,7 @@ def _input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
target=Target.TERMINATE, target=Target.TERMINATE,
) )
def terminate_lan_switch_interconnect() -> StepList: def terminate_lan_switch_interconnect() -> StepList:
"""Terminate a :term:`LAN` Switch Interconnect.""" """Terminate a LAN Switch Interconnect."""
return ( return (
begin begin
>> store_process_subscription(Target.TERMINATE) >> store_process_subscription(Target.TERMINATE)
......
"""Validation workflow for :term:`LAN` Switch Interconnect subscription objects.""" """Validation workflow for LAN Switch Interconnect subscription objects."""
from typing import Any from typing import Any
...@@ -29,7 +29,7 @@ def validate_config(subscription: dict[str, Any]) -> LSOState: ...@@ -29,7 +29,7 @@ def validate_config(subscription: dict[str, Any]) -> LSOState:
"Validate LAN Switch Interconnect", target=Target.SYSTEM, initial_input_form=(wrap_modify_initial_input_form(None)) "Validate LAN Switch Interconnect", target=Target.SYSTEM, initial_input_form=(wrap_modify_initial_input_form(None))
) )
def validate_lan_switch_interconnect() -> StepList: def validate_lan_switch_interconnect() -> StepList:
"""Validate an existing :term:`LAN` Switch Interconnect.""" """Validate an existing LAN Switch Interconnect."""
return ( return (
begin begin
>> store_process_subscription(Target.SYSTEM) >> store_process_subscription(Target.SYSTEM)
......
...@@ -31,7 +31,7 @@ def create_subscription(partner: str) -> State: ...@@ -31,7 +31,7 @@ def create_subscription(partner: str) -> State:
def initial_input_form_generator() -> FormGenerator: def initial_input_form_generator() -> FormGenerator:
"""Generate a form that is filled in using information passed through the :term:`API` endpoint.""" """Generate a form that is filled in using information passed through the API endpoint."""
class ImportOfficeRouter(FormPage): class ImportOfficeRouter(FormPage):
model_config = ConfigDict(title="Import an office router") model_config = ConfigDict(title="Import an office router")
......
...@@ -29,7 +29,7 @@ def create_subscription(partner: str) -> State: ...@@ -29,7 +29,7 @@ def create_subscription(partner: str) -> State:
def initial_input_form_generator() -> FormGenerator: def initial_input_form_generator() -> FormGenerator:
"""Generate a form that is filled in using information passed through the :term:`API` endpoint.""" """Generate a form that is filled in using information passed through the API endpoint."""
class ImportOpengear(FormPage): class ImportOpengear(FormPage):
model_config = ConfigDict(title="Import Opengear") model_config = ConfigDict(title="Import Opengear")
......
...@@ -29,7 +29,7 @@ def create_subscription(partner: str) -> State: ...@@ -29,7 +29,7 @@ def create_subscription(partner: str) -> State:
def initial_input_form_generator() -> FormGenerator: def initial_input_form_generator() -> FormGenerator:
"""Generate a form that is filled in using information passed through the :term:`API` endpoint.""" """Generate a form that is filled in using information passed through the API endpoint."""
class ImportRouter(FormPage): class ImportRouter(FormPage):
model_config = ConfigDict(title="Import Router") model_config = ConfigDict(title="Import Router")
......
...@@ -183,9 +183,10 @@ def create_netbox_device(subscription: RouterInactive) -> State: ...@@ -183,9 +183,10 @@ def create_netbox_device(subscription: RouterInactive) -> State:
@step("Verify IPAM resources for loopback interface") @step("Verify IPAM resources for loopback interface")
def verify_ipam_loopback(subscription: RouterInactive) -> None: def verify_ipam_loopback(subscription: RouterInactive) -> None:
"""Validate the :term:`IPAM` resources for the loopback interface. """Validate the IPAM resources for the loopback interface.
Raises an :class:`orchestrator.utils.errors.ProcessFailureError` if :term:`IPAM` is configured incorrectly. Raises:
ProcessFailureError: If IPAM is configured incorrectly.
""" """
host_record = infoblox.find_host_by_fqdn(f"lo0.{subscription.router.router_fqdn}") host_record = infoblox.find_host_by_fqdn(f"lo0.{subscription.router.router_fqdn}")
if not host_record or str(subscription.subscription_id) not in host_record.comment: if not host_record or str(subscription.subscription_id) not in host_record.comment:
...@@ -289,9 +290,9 @@ def create_router() -> StepList: ...@@ -289,9 +290,9 @@ def create_router() -> StepList:
"""Create a new router in the service database. """Create a new router in the service database.
* Create and initialise the subscription object in the service database * Create and initialise the subscription object in the service database
* Allocate :term:`IPAM` resources for the loopback interface * Allocate IPAM resources for the loopback interface
* Deploy configuration on the new router, first as a dry run * Deploy configuration on the new router, first as a dry run
* Validate :term:`IPAM` resources * Validate IPAM resources
* Create a new device in Netbox * Create a new device in Netbox
""" """
router_is_nokia = conditional(lambda state: state["vendor"] == Vendor.NOKIA) router_is_nokia = conditional(lambda state: state["vendor"] == Vendor.NOKIA)
......
"""A modification workflow for setting a new :term:`ISIS` metric for an IP trunk.""" """A modification workflow for setting a new ISIS metric for an IP trunk."""
from orchestrator.targets import Target from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr from orchestrator.types import State, UUIDstr
......
...@@ -208,7 +208,7 @@ def prompt_radius_login() -> FormGenerator: ...@@ -208,7 +208,7 @@ def prompt_radius_login() -> FormGenerator:
@step("Update subscription model") @step("Update subscription model")
def update_subscription_model(subscription: Router) -> State: def update_subscription_model(subscription: Router) -> State:
"""Update the database model, such that it should not be reached via :term:`OOB` access anymore.""" """Update the database model, such that it should not be reached via OOB access anymore."""
subscription.router.router_access_via_ts = False subscription.router.router_access_via_ts = False
return {"subscription": subscription} return {"subscription": subscription}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment