Skip to content
Snippets Groups Projects
Verified Commit 891cf643 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

resolve more linting errors

parent 89de0cf3
Branches
Tags
1 merge request!111Feature/ruff everything party hat emoji
Showing
with 76 additions and 34 deletions
......@@ -4,6 +4,7 @@ import typer
from orchestrator import OrchestratorCore, app_settings
from orchestrator.cli.main import app as cli_app
# noinspection PyUnresolvedReferences
import gso.products
import gso.workflows # noqa: F401
from gso.api import router as api_router
......
......@@ -39,7 +39,12 @@ def set_isis_to_90000(subscription: Iptrunk, process_id: UUIDstr, callback_route
old_isis_metric = subscription.iptrunk.iptrunk_isis_metric
subscription.iptrunk.iptrunk_isis_metric = 90000
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
subscription,
process_id,
callback_route,
tt_number,
"isis_interface",
dry_run=False,
)
return {
......
......@@ -243,7 +243,12 @@ def provision_ip_trunk_iface_dry(
) -> State:
"""Perform a dry run of deploying configuration on both sides of the trunk."""
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "trunk_interface", dry_run=True,
subscription,
process_id,
callback_route,
tt_number,
"trunk_interface",
dry_run=True,
)
return {"subscription": subscription}
......@@ -258,7 +263,12 @@ def provision_ip_trunk_iface_real(
) -> State:
"""Deploy IP trunk configuration on both sides."""
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "trunk_interface", dry_run=False,
subscription,
process_id,
callback_route,
tt_number,
"trunk_interface",
dry_run=False,
)
return {"subscription": subscription}
......@@ -299,7 +309,12 @@ def provision_ip_trunk_isis_iface_real(
) -> State:
"""Deploy :term:`ISIS` configuration on both sides."""
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
subscription,
process_id,
callback_route,
tt_number,
"isis_interface",
dry_run=False,
)
return {"subscription": subscription}
......
......@@ -296,13 +296,13 @@ def deploy_new_config_real(
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_continue_move_fiber() -> FormGenerator:
"""Wait for confirmation from an operator that the physical fiber has been moved."""
class ProvisioningResultPage(FormPage):
class Config:
title = "Please confirm before continuing"
info_label: Label = (
"New trunk interface has been deployed, "
"wait for the physical connection to be moved." # type: ignore[assignment]
"New trunk interface has been deployed, wait for the physical connection to be moved." # type: ignore[assignment]
)
yield ProvisioningResultPage
......@@ -349,6 +349,7 @@ def deploy_new_isis(
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_continue_restore_isis() -> FormGenerator:
"""Wait for an operator to confirm that the old :term:`ISIS` metric should be restored."""
class ProvisioningResultPage(FormPage):
class Config:
title = "Please confirm before continuing"
......@@ -373,7 +374,12 @@ def restore_isis_metric(
"""Restore the :term:`ISIS` metric to its original value."""
subscription.iptrunk.iptrunk_isis_metric = old_isis_metric
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
subscription,
process_id,
callback_route,
tt_number,
"isis_interface",
dry_run=False,
)
return {"subscription": subscription}
......
......@@ -55,7 +55,12 @@ def provision_ip_trunk_isis_iface_real(
) -> State:
"""Deploy the new :term:`ISIS` metric on both sides of the trunk."""
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
subscription,
process_id,
callback_route,
tt_number,
"isis_interface",
dry_run=False,
)
return {"subscription": subscription}
......
......@@ -25,6 +25,7 @@ from gso.utils.helpers import set_isis_to_90000
def initial_input_form_generator() -> FormGenerator:
"""Ask the operator to confirm whether router configuration and/or IPAM resources should be deleted."""
class TerminateForm(FormPage):
termination_label: Label = (
"Please confirm whether configuration should get removed from the A and B sides of the trunk, and whether "
......@@ -50,7 +51,12 @@ def drain_traffic_from_ip_trunk(
XXX: Should this not be done with the isis-90k-step?
"""
provisioning_proxy.provision_ip_trunk(
subscription, process_id, callback_route, tt_number, "isis_interface", dry_run=False,
subscription,
process_id,
callback_route,
tt_number,
"isis_interface",
dry_run=False,
)
return {"subscription": subscription}
......
......@@ -39,6 +39,7 @@ def _site_selector() -> Choice:
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather information about the new router from the operator."""
class CreateRouterForm(FormPage):
class Config:
title = product_name
......
......@@ -23,6 +23,7 @@ from gso.utils.helpers import (
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Get input from the operator about the new site subscription."""
class CreateSiteForm(FormPage):
class Config:
title = product_name
......
......@@ -50,6 +50,7 @@ def create_subscription(customer: str) -> State:
def initial_input_form_generator() -> FormGenerator:
"""Generate a form that is filled in using information passed through the :term:`API` endpoint."""
class ImportRouter(FormPage):
class Config:
title = "Import Router"
......
......@@ -34,6 +34,7 @@ def create_subscription(customer: str) -> State:
def generate_initial_input_form() -> FormGenerator:
"""Generate a form that is filled in using information passed through the :term:`API` endpoint."""
class ImportSite(FormPage):
class Config:
title = "Import Site"
......
......@@ -214,14 +214,14 @@ def run_migrations(db_uri: str) -> None:
version_locations = alembic_cfg.get_main_option("version_locations")
alembic_cfg.set_main_option(
"version_locations",
f"{version_locations} {os.path.dirname(orchestrator.__file__)}/migrations/versions/schema",
f"{version_locations} {Path(orchestrator.__file__).parent}/migrations/versions/schema",
)
command.upgrade(alembic_cfg, "heads")
@pytest.fixture(scope="session")
def database(db_uri):
def _database(db_uri):
"""Create database and run migrations and cleanup after wards.
Args:
......@@ -257,13 +257,13 @@ def database(db_uri):
conn.execute(text("COMMIT;"))
# Terminate all connections to the database
conn.execute(
text(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{db_to_create}';") # noqa
text(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{db_to_create}';"), # noqa: S608
)
conn.execute(text(f'DROP DATABASE IF EXISTS "{db_to_create}";'))
@pytest.fixture(autouse=True)
def db_session(database):
def _db_session(_database):
"""Ensure that tests are executed within a transactional scope that automatically rolls back after completion.
This fixture facilitates a pattern known as 'transactional tests'. At the start, it establishes a connection and
......@@ -286,7 +286,10 @@ def db_session(database):
with contextlib.closing(db.wrapped_database.engine.connect()) as test_connection:
# Create a new session factory for this context.
session_factory = sessionmaker(bind=test_connection, **SESSION_ARGUMENTS)
scoped_session_instance = scoped_session(session_factory, scopefunc=db.wrapped_database._scopefunc)
scoped_session_instance = scoped_session(
session_factory,
scopefunc=db.wrapped_database._scopefunc, # noqa: SLF001
)
# Point the database session to this new scoped session.
db.wrapped_database.session_factory = session_factory
......
......@@ -90,7 +90,7 @@ def router_subscription_factory(site_subscription_factory, faker):
router_vendor=RouterVendor.NOKIA,
router_role=RouterRole.PE,
router_site=None,
router_is_ias_connected=True,
router_is_ias_connected=True, # noqa: FBT002
status: SubscriptionLifecycle | None = None,
) -> UUIDstr:
description = description or faker.text(max_nb_chars=30)
......
from test.fixtures import ( # noqa
from test.fixtures import ( # noqa: F401
iptrunk_side_subscription_factory,
iptrunk_subscription_factory,
router_subscription_factory,
......
......@@ -49,6 +49,5 @@ def test_longitude(input_value, is_valid):
if is_valid:
assert LongitudeCoordinate.validate(input_value) == input_value
else:
with pytest.raises(ValueError) as excinfo:
with pytest.raises(ValueError, match="Invalid longitude coordinate"):
LongitudeCoordinate.validate(input_value)
assert "Invalid longitude coordinate" in str(excinfo.value)
......@@ -158,9 +158,8 @@ def test_reserve_interface_exception(mock_api, device, interface, data_config_fi
mock_api.return_value.dcim.interfaces.get.return_value = interface
# Check exception
with pytest.raises(WorkflowStateError) as test_exception:
with pytest.raises(WorkflowStateError, match=exception_message):
NetboxClient().reserve_interface(device.name, interface.name)
assert str(test_exception.value) == exception_message
@patch("gso.services.netbox_client.pynetbox.api")
......@@ -207,9 +206,8 @@ def test_allocate_interface_exception(mock_api, device, interface, data_config_f
mock_api.return_value.dcim.interfaces.get.return_value = interface
# Check exception
with pytest.raises(WorkflowStateError) as test_exception:
with pytest.raises(WorkflowStateError, match=exception_message):
NetboxClient().allocate_interface(device.name, interface.name)
assert str(test_exception.value) == exception_message
@patch("gso.services.netbox_client.pynetbox.api")
......@@ -317,4 +315,4 @@ def test_free_interface(mock_api, device, interface):
cleared_interface = netbox_client.free_interface(device_name, interface_name)
assert cleared_interface.enabled is False
assert cleared_interface.mark_connected is False
assert cleared_interface.description == ""
assert not cleared_interface.description
from test.fixtures import router_subscription_factory, site_subscription_factory # noqa
from test.fixtures import router_subscription_factory, site_subscription_factory # noqa: F401
......@@ -223,7 +223,7 @@ def resume_workflow(
nr_of_steps_done = len(persistent)
remaining_steps = process.workflow.steps[nr_of_steps_done:]
if step_log and step_log[-1][1].issuspend():
if step_log and step_log[-1][1].issuspend(): # noqa: SIM114
_, current_state = step_log[-1]
elif step_log and step_log[-1][1].isawaitingcallback():
_, current_state = step_log[-1]
......
import pytest
from urllib3_mock import Responses
from test.fixtures import ( # noqa
from test.fixtures import ( # noqa: F401
iptrunk_side_subscription_factory,
iptrunk_subscription_factory,
router_subscription_factory,
......@@ -14,7 +14,7 @@ def responses():
responses_mock = Responses("requests.packages.urllib3")
def _find_request(call):
mock_url = responses_mock._find_match(call.request)
mock_url = responses_mock._find_match(call.request) # noqa: SLF001
if not mock_url:
pytest.fail(f"Call not mocked: {call.request}")
return mock_url
......@@ -25,7 +25,7 @@ def responses():
with responses_mock:
yield responses_mock
mocked_urls = map(_to_tuple, responses_mock._urls)
mocked_urls = map(_to_tuple, responses_mock._urls) # noqa: SLF001
used_urls = map(_to_tuple, map(_find_request, responses_mock.calls))
not_used = set(mocked_urls) - set(used_urls)
if not_used:
......
......@@ -19,7 +19,7 @@ from test.workflows import (
@pytest.fixture()
def netbox_client_mock():
def _netbox_client_mock():
# Mock NetboxClient methods
with (
patch("gso.services.netbox_client.NetboxClient.get_device_by_name") as mock_get_device_by_name,
......@@ -90,6 +90,7 @@ def input_form_wizard_data(router_subscription_factory, faker):
@pytest.mark.workflow()
@pytest.mark.usefixtures(_netbox_client_mock)
@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk")
@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk")
@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network")
......@@ -103,7 +104,6 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
input_form_wizard_data,
faker,
data_config_filename: PathLike,
netbox_client_mock,
test_client,
):
mock_allocate_v4_network.return_value = faker.ipv4_network()
......@@ -129,6 +129,7 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
@pytest.mark.workflow()
@pytest.mark.usefixtures(_netbox_client_mock)
@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.check_ip_trunk")
@patch("gso.workflows.iptrunk.create_iptrunk.provisioning_proxy.provision_ip_trunk")
@patch("gso.workflows.iptrunk.create_iptrunk.infoblox.allocate_v6_network")
......@@ -141,7 +142,6 @@ def test_iptrunk_creation_fails_when_lso_return_code_is_one(
responses,
input_form_wizard_data,
faker,
netbox_client_mock,
data_config_filename: PathLike,
):
mock_allocate_v4_network.return_value = faker.ipv4_network()
......@@ -156,3 +156,4 @@ def test_iptrunk_creation_fails_when_lso_return_code_is_one(
assert_pp_interaction_failure(result, process_stat, step_log)
assert mock_check_ip_trunk.call_count == 0
assert mock_provision_ip_trunk.call_count == 1
......@@ -70,6 +70,5 @@ def test_site_name_is_incorrect(responses, faker):
},
]
with pytest.raises(FormValidationError) as test_exception:
result, process, step_log = run_workflow("create_site", initial_site_data)
assert str(test_exception.value) == expected_exception_msg
with pytest.raises(FormValidationError, match=expected_exception_msg):
run_workflow("create_site", initial_site_data)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment