Skip to content
Snippets Groups Projects
Verified Commit 26659a98 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Merge remote-tracking branch 'origin/develop' into feature/use-async-steps

parents 8c8d65ed db2473dc
No related branches found
No related tags found
1 merge request!96Make use of new callback step for external provisioning
......@@ -2,7 +2,7 @@
stages:
- tox
- documentation
- sonarqube
include:
- docs/.gitlab-ci.yml
......@@ -16,6 +16,7 @@ run-tox-pipeline:
services:
- postgres:15.4
# Change pip's cache directory to be inside the project directory since we can
# only cache local items.
variables:
......@@ -46,3 +47,11 @@ run-tox-pipeline:
artifacts:
paths:
- htmlcov
sonarqube:
stage: sonarqube
image: sonarsource/sonar-scanner-cli
script:
- sonar-scanner -Dsonar.login=$SONAR_TOKEN -Dproject.settings=./sonar.properties
tags:
- docker-executor
import logging
import os
import orchestrator
from alembic import context
from orchestrator.db.database import BaseModel
from orchestrator.settings import app_settings
from sqlalchemy import engine_from_config, pool
from sqlalchemy import engine_from_config, pool, text
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
......@@ -17,17 +15,8 @@ logger = logging.getLogger("alembic.env")
config.set_main_option("sqlalchemy.url", app_settings.DATABASE_URI)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = BaseModel.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
......@@ -85,8 +74,10 @@ def run_migrations_online() -> None:
)
try:
with context.begin_transaction():
connection.execute(text("SELECT pg_advisory_xact_lock(1000);"))
context.run_migrations()
finally:
connection.execute(text("SELECT pg_advisory_unlock(1000);"))
connection.close()
......
......@@ -3,7 +3,6 @@ from uuid import UUID
import pydantic
import pynetbox
from infoblox_client.objects import Interface
from pynetbox.models.dcim import Devices, DeviceTypes, Interfaces
from gso.products.product_types.router import Router
......@@ -63,12 +62,22 @@ class NetboxClient:
)
def get_device_by_name(self, device_name: str) -> Devices:
"""Return the device object by name from netbox, or ``None`` if not found."""
return self.netbox.dcim.devices.get(name=device_name)
"""Return the device object by name from netbox, or raise not found."""
device = self.netbox.dcim.devices.get(name=device_name)
if device is None:
raise NotFoundError(f"Device: {device_name} not found.")
return device
def get_interfaces_by_device(self, device_name: str, speed: str) -> list[Interfaces]:
"""Get all interfaces of a device by name and speed."""
def get_interface_by_name_and_device(self, iface_name: str, device_name: str) -> Interfaces:
"""Return the interface lists by name and device name from netbox."""
device = self.get_device_by_name(device_name)
interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
if interface is None:
raise NotFoundError(f"Interface: {iface_name} on device with id: {device.id} not found.")
return interface
def get_interfaces_by_device(self, device_name: str, speed: str) -> list[Interfaces]:
"""Get all interfaces of a device by name and speed that are not reserved and not allocated."""
device = self.get_device_by_name(device_name)
return list(
self.netbox.dcim.interfaces.filter(device_id=device.id, enabled=False, mark_connected=False, speed=speed)
......@@ -84,6 +93,7 @@ class NetboxClient:
Returns the new interface object as dict.
"""
device = self.get_device_by_name(device_name)
return self.netbox.dcim.interfaces.create(
name=iface_name,
type=type,
......@@ -96,8 +106,7 @@ class NetboxClient:
def delete_interface(self, device_name: str, iface_name: str) -> None:
"""Delete an interface from a device by name."""
device = self.get_device_by_name(device_name)
interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
interface = self.get_interface_by_name_and_device(iface_name, device_name)
return interface.delete()
def create_device_type(self, manufacturer: str, model: str, slug: str) -> DeviceTypes:
......@@ -131,7 +140,7 @@ class NetboxClient:
return int("".join(filter(str.isdigit, type_parts[0]))) * 1000000
return None
def create_device(self, router_name: str, site_tier: str) -> Devices:
def create_device(self, device_name: str, site_tier: str) -> Devices:
"""Create a new device in Netbox."""
# Get device type id
......@@ -146,7 +155,7 @@ class NetboxClient:
# Create new device
device = self.netbox.dcim.devices.create(
name=router_name, device_type=device_type.id, role=device_role.id, site=device_site.id
name=device_name, device_type=device_type.id, role=device_role.id, site=device_site.id
)
module_bays = list(self.netbox.dcim.module_bays.filter(device_id=device.id))
card_type = self.netbox.dcim.module_types.get(model=tier_info.module_type)
......@@ -168,8 +177,9 @@ class NetboxClient:
return device
def delete_device(self, router_name: str) -> None:
self.netbox.dcim.devices.get(name=router_name).delete()
def delete_device(self, device_name: str) -> None:
"""Delete device by name."""
self.netbox.dcim.devices.get(name=device_name).delete()
return
def attach_interface_to_lag(
......@@ -179,16 +189,16 @@ class NetboxClient:
Returns the interface object after assignment.
"""
# Get device id
device = self.get_device_by_name(device_name)
# Get interface for device
iface = self.netbox.dcim.interfaces.get(name=iface_name, device_id=device.id)
iface = self.get_interface_by_name_and_device(iface_name, device_name)
# Get LAG
lag = self.netbox.dcim.interfaces.get(name=lag_name, device_id=device.id)
lag = self.get_interface_by_name_and_device(lag_name, device_name)
# Assign interface to LAG
# Assign interface to LAG, ensuring it doesn't already belong to a LAG
if iface.lag:
raise WorkflowStateError(
f"The interface: {iface_name} on device: {device_name} already belongs to a LAG: {iface.lag.name}."
)
iface.lag = lag.id
# Set description if provided
......@@ -196,19 +206,13 @@ class NetboxClient:
iface.description = description
iface.save()
return iface
def reserve_interface(self, device_name: str, iface_name: str) -> Interfaces:
"""Reserve an interface by enabling it."""
# First get interface from device
device = self.get_device_by_name(device_name)
interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
# Reserve interface by enabling it
if interface is None:
raise NotFoundError(f"Interface: {iface_name} on device: {device_name} not found.")
interface = self.get_interface_by_name_and_device(iface_name, device_name)
# Check if interface is reserved
if interface.enabled:
......@@ -223,18 +227,14 @@ class NetboxClient:
def allocate_interface(self, device_name: str, iface_name: str) -> Interfaces:
"""Allocate an interface by marking it as connected."""
device = self.get_device_by_name(device_name)
interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
# Check if interface is available
if interface is None:
raise NotFoundError(f"Interface: {iface_name} on device: {device_name} not found.")
# First get interface from device
interface = self.get_interface_by_name_and_device(iface_name, device_name)
# Check if interface is reserved
if interface.mark_connected:
raise WorkflowStateError(f"The interface: {iface_name} on device: {device_name} is already allocated.")
# allocate interface by mark as connected
# Allocate interface by marking it as connected
interface.mark_connected = True
interface.save()
......@@ -243,13 +243,8 @@ class NetboxClient:
def free_interface(self, device_name: str, iface_name: str) -> Interfaces:
"""Free interface by marking disconnect and disable it."""
device = self.get_device_by_name(device_name)
interface = self.netbox.dcim.interfaces.get(device_id=device.id, name=iface_name)
# Check if interface is available
if interface is None:
raise NotFoundError(f"Interface: {iface_name} on device: {device_name} not found.")
# First get interface from device
interface = self.get_interface_by_name_and_device(iface_name, device_name)
interface.mark_connected = False
interface.enabled = False
interface.description = ""
......@@ -302,13 +297,3 @@ class NetboxClient:
return self.netbox.dcim.interfaces.filter(
device=device.name, enabled=False, mark_connected=False, speed=speed_bps
)
def get_interface_by_name_and_device(self, router_id: UUID, interface_name: str) -> Interface:
"""Return the interface object by name and device from netbox, or ``None`` if not found."""
router = Router.from_subscription(router_id).router.router_fqdn
device = self.get_device_by_name(router)
try:
return self.netbox.dcim.interfaces.get(device=device.name, name=interface_name)
except pynetbox.RequestError:
raise NotFoundError(f"Interface: {interface_name} on device: {device.name} not found.")
......@@ -65,7 +65,9 @@ def available_interfaces_choices_including_current_members(
available_interfaces = list(NetboxClient().get_available_interfaces(router_id, speed))
available_interfaces.extend(
[
NetboxClient().get_interface_by_name_and_device(router_id, interface.interface_name)
NetboxClient().get_interface_by_name_and_device(
interface.interface_name, Router.from_subscription(router_id).router.router_fqdn
)
for interface in interfaces
]
)
......
......@@ -8,8 +8,10 @@ from orchestrator.workflow import StepList, conditional, done, init, step, workf
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_blocks.router import RouterVendor
from gso.products.product_types.iptrunk import Iptrunk
from gso.services import infoblox, provisioning_proxy
from gso.services.netbox_client import NetboxClient
from gso.services.provisioning_proxy import pp_interaction
from gso.utils.helpers import set_isis_to_90000
......@@ -51,6 +53,22 @@ def deprovision_ip_trunk_real(subscription: Iptrunk, process_id: UUIDstr, callba
return {"subscription": subscription}
@step("Remove IP Trunk from NetBox")
def free_interfaces_in_netbox(subscription: Iptrunk) -> State:
for side in [0, 1]:
router = subscription.iptrunk.iptrunk_sides[side].iptrunk_side_node
router_fqdn = router.router_fqdn
if router.router_vendor == RouterVendor.NOKIA:
nbclient = NetboxClient()
# Remove physical interfaces from LAGs
for member in subscription.iptrunk.iptrunk_sides[side].iptrunk_side_ae_members:
nbclient.free_interface(router_fqdn, member.interface_name)
# Delete LAGs
nbclient.delete_interface(router_fqdn, subscription.iptrunk.iptrunk_sides[side].iptrunk_side_ae_iface)
return {"subscription": subscription}
@step("Deprovision IPv4 networks")
def deprovision_ip_trunk_ipv4(subscription: Iptrunk) -> dict:
infoblox.delete_network(ipaddress.IPv4Network(subscription.iptrunk.iptrunk_ipv4_network))
......@@ -87,6 +105,7 @@ def terminate_iptrunk() -> StepList:
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> run_config_steps(config_steps)
>> free_interfaces_in_netbox
>> run_ipam_steps(ipam_steps)
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
......
sonar.projectKey=gso
sonar.projectName=GSO
sonar.projectVersion=0.x
sonar.sources=gso
sonar.python.coverage.reportPaths=coverage.xml
sonar.host.url=https://sonarqube.software.geant.org/
\ No newline at end of file
......@@ -59,6 +59,7 @@ def interface():
"type": "1000BaseT",
"enabled": False,
"mark_connected": False,
"lag": None,
}
return Record(values, None, None)
......@@ -247,7 +248,7 @@ def test_delete_device(mock_api, device, data_config_filename: PathLike):
def test_get_interfaces_by_device(mock_api, device, interface, data_config_filename: PathLike):
"""Test if a interface is returned for a device."""
# Setup interface speed
speed = 1000
speed = "1000"
# Mock netbox api
mock_api.return_value.dcim.devices.get.return_value = device
......@@ -289,3 +290,24 @@ def test_attach_interface_to_lag(mock_api, device, interface, lag, data_config_f
assert lag_interface.lag == lag.id
assert lag_interface.description == description
mock_save.assert_called_once()
@patch("gso.services.netbox_client.pynetbox.api")
def test_free_interface(mock_api, device, interface):
device_name = "mx1.lab.geant.net"
interface_name = "et-0/0/1"
# Define mock calls
mock_api.return_value.dcim.devices.get.return_value = device
mock_api.return_value.dcim.interfaces.get.return_value = interface
# Create a NetboxClient instance
netbox_client = NetboxClient()
# Test free_interface method on success
interface.mark_connected = True
interface.enabled = True
cleared_interface = netbox_client.free_interface(device_name, interface_name)
assert cleared_interface.enabled is False
assert cleared_interface.mark_connected is False
assert cleared_interface.description == ""
......@@ -20,7 +20,6 @@ from test.workflows.iptrunk.test_create_iptrunk import MockedNetboxClient
@pytest.mark.workflow
@patch("gso.workflows.iptrunk.migrate_iptrunk.provisioning_proxy.migrate_ip_trunk")
@patch("gso.workflows.iptrunk.migrate_iptrunk.provisioning_proxy.provision_ip_trunk")
@patch("gso.services.netbox_client.NetboxClient.get_device_by_name")
@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces")
@patch("gso.services.netbox_client.NetboxClient.get_available_lags")
@patch("gso.services.netbox_client.NetboxClient.create_interface")
......@@ -38,7 +37,6 @@ def test_migrate_iptrunk_success(
mocked_create_interface,
mocked_get_available_lags,
mocked_get_available_interfaces,
mocked_get_device_by_name,
mock_provision_ip_trunk,
mock_migrate_ip_trunk,
iptrunk_subscription_factory,
......@@ -48,7 +46,6 @@ def test_migrate_iptrunk_success(
):
# Set up mock return values
mocked_netbox = MockedNetboxClient()
mocked_get_device_by_name.return_value = mocked_netbox.get_device_by_name()
mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces()
mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag()
mocked_reserve_interface.return_value = mocked_netbox.reserve_interface()
......
......@@ -17,7 +17,6 @@ from test.workflows.iptrunk.test_create_iptrunk import MockedNetboxClient
@pytest.mark.workflow
@patch("gso.workflows.iptrunk.modify_trunk_interface.provisioning_proxy.provision_ip_trunk")
@patch("gso.services.netbox_client.NetboxClient.get_device_by_name")
@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces")
@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag")
@patch("gso.services.netbox_client.NetboxClient.reserve_interface")
......@@ -31,14 +30,12 @@ def test_iptrunk_modify_trunk_interface_success(
mocked_reserve_interface,
mocked_attach_interface_to_lag,
mocked_get_available_interfaces,
mocked_get_device_by_name,
mock_provision_ip_trunk,
iptrunk_subscription_factory,
faker,
):
# Set up mock return values
mocked_netbox = MockedNetboxClient()
mocked_get_device_by_name.return_value = mocked_netbox.get_device_by_name()
mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces()
mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag()
mocked_reserve_interface.return_value = mocked_netbox.reserve_interface()
......
......@@ -3,6 +3,7 @@ from unittest.mock import patch
import pytest
from gso.products import Iptrunk
from test.services.conftest import MockedNetboxClient
from test.workflows import (
assert_complete,
assert_suspended,
......@@ -17,7 +18,11 @@ from test.workflows import (
@patch("gso.workflows.iptrunk.terminate_iptrunk.provisioning_proxy.provision_ip_trunk")
@patch("gso.workflows.iptrunk.terminate_iptrunk.provisioning_proxy.deprovision_ip_trunk")
@patch("gso.workflows.iptrunk.terminate_iptrunk.infoblox.delete_network")
def test_iptrunk_modify_isis_metric_success(
@patch("gso.services.netbox_client.NetboxClient.delete_interface")
@patch("gso.services.netbox_client.NetboxClient.free_interface")
def test_successful_iptrunk_termination(
mocked_free_interface,
mocked_delete_interface,
mock_infoblox_delete_network,
mock_deprovision_ip_trunk,
mock_provision_ip_trunk,
......@@ -26,6 +31,9 @@ def test_iptrunk_modify_isis_metric_success(
):
# Set up mock return values
product_id = iptrunk_subscription_factory()
mocked_netbox = MockedNetboxClient()
mocked_delete_interface.return_value = mocked_netbox.delete_interface()
mocked_free_interface.return_value = mocked_netbox.free_interface()
# Run workflow
initial_iptrunk_data = [
......@@ -54,6 +62,10 @@ def test_iptrunk_modify_isis_metric_success(
assert_complete(result)
# Check NetboxClient calls
assert mocked_delete_interface.call_count == 2 # once for each side
assert mocked_free_interface.call_count == 4 # Free interfaces for each side(2 per side)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Iptrunk.from_subscription(subscription_id)
......
"""Command line tool to communicate withthe NetBox API."""
"""Command line tool to communicate with the NetBox API."""
from typing import Any, Dict, List
import click
......@@ -38,7 +38,7 @@ def device(fqdn: str, model: str) -> None:
@create.command()
@click.option("--name", help="Interfacename")
@click.option("--name", help="Interface name")
@click.option("--type", default="10gbase-t", help="Interface type, default is 10GBASE-T")
@click.option("--speed", default="1000", help="Interface speed , default is 1000")
@click.option("--fqdn", help="Device where to create interface")
......@@ -131,41 +131,99 @@ list.add_command(interfaces)
list.add_command(devices)
# Define here attach command
# Define delete commands here
@cli.group()
def attach() -> None:
def delete() -> None:
pass
@attach.command()
@click.option("--fqdn", help="Device name where to attach interface to lag")
@click.option("--iface", help="Interface name to attach to lag")
@click.option("--lag", help="LAG name to attach interface")
def interface_to_lag(fqdn: str, iface: str, lag: str) -> None:
click.echo(f"Attaching interface to lag: device ={fqdn}, interface name={iface} to lag={lag}")
new_iface = NetboxClient().attach_interface_to_lag(fqdn, lag, iface)
click.echo(new_iface)
@delete.command() # type: ignore[no-redef]
@click.option("--fqdn", help="Name of device to delete")
def device(fqdn: str) -> None:
click.echo(f"Deleting device: device={fqdn}")
NetboxClient().delete_device(fqdn)
attach.add_command(interface_to_lag)
@delete.command() # type: ignore[no-redef]
@click.option("--fqdn", help="Device name from where to get interface to delete")
@click.option("--iface", help="Name of interface name to delete")
def interface(fqdn: str, iface: str) -> None:
click.echo(f"Deleting interface: device={fqdn}, interface name={iface}")
NetboxClient().delete_interface(fqdn, iface)
# The reserve command
delete.add_command(device)
delete.add_command(interface)
# The action command
@cli.group()
def reserve() -> None:
def action() -> None:
pass
@reserve.command()
@click.option("--fqdn", help="Device name where to get interface to reserve")
@click.option("--iface", help="Interface name to reserve")
@action.command()
@click.option("--fqdn", help="Device name from where to get interface to edit")
@click.option("--iface", help="Interface name to edit")
def reserve_interface(fqdn: str, iface: str) -> None:
click.echo(f"Reserving interface: device ={fqdn}, interface name={iface}")
reserved_iface = NetboxClient().reserve_interface(fqdn, iface)
click.echo(reserved_iface)
reserve.add_command(reserve_interface)
@action.command()
@click.option("--fqdn", help="Device name from where to get interface to edit")
@click.option("--iface", help="Interface name to edit")
def free_interface(fqdn: str, iface: str) -> None:
click.echo(f"Freeing interface: device={fqdn}, interface name={iface}")
freed_iface = NetboxClient().free_interface(fqdn, iface)
click.echo(freed_iface)
@action.command()
@click.option("--fqdn", help="Device name from where to get interface to edit")
@click.option("--iface", help="Interface name to edit")
def allocate_interface(fqdn: str, iface: str) -> None:
click.echo(f"Allocating interface: device={fqdn}, interface name={iface}")
allocated_iface = NetboxClient().allocate_interface(fqdn, iface)
click.echo(allocated_iface)
@action.command()
@click.option("--fqdn", help="Device name from where to get interface to edit")
@click.option("--iface", help="Interface name to edit")
def deallocate_interface(fqdn: str, iface: str) -> None:
click.echo(f"Deallocating interface: device={fqdn}, interface name={iface}")
deallocated_iface = NetboxClient().free_interface(fqdn, iface)
click.echo(deallocated_iface)
@action.command()
@click.option("--fqdn", help="Device name from where to get physical interface to attach LAG")
@click.option("--lag", help="LAG name to attach physical interface to")
@click.option("--iface", help="Interface name to attach to LAG")
def attach_interface_to_lag(fqdn: str, lag: str, iface: str) -> None:
click.echo(f"Attaching LAG to physical interface: device={fqdn}, LAG name={lag}, interface name={iface}")
attached_iface = NetboxClient().attach_interface_to_lag(fqdn, lag, iface)
click.echo(attached_iface)
@action.command()
@click.option("--fqdn", help="Device name from where to get physical interface to detach LAG")
@click.option("--lag", help="LAG name to detach from physical interface")
@click.option("--iface", help="Interface name to detach LAG from")
def detach_interface_from_lag(fqdn: str, lag: str, iface: str) -> None:
click.echo(f"Detaching LAG from physical interface: device={fqdn}, LAG name={lag}, interface name={iface}")
NetboxClient().detach_interfaces_from_lag(fqdn, lag)
click.echo(f"Detached LAG from physical interface: device={fqdn}, LAG name={lag}, interface name={iface}")
action.add_command(reserve_interface)
action.add_command(free_interface)
action.add_command(allocate_interface)
action.add_command(deallocate_interface)
action.add_command(attach_interface_to_lag)
action.add_command(detach_interface_from_lag)
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment