Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • goat/gap/geant-service-orchestrator
1 result
Select Git revision
Show changes
Commits on Source (6)
......@@ -47,8 +47,10 @@ def init_cli_app() -> typer.Typer:
def init_sentry() -> None:
"""Only initialize Sentry if not in testing mode."""
if os.getenv("TESTING", "false").lower() == "false" and (sentry_config := load_oss_params().SENTRY):
sentry_sdk.init(dsn=sentry_config.DSN, environment=sentry_config.environment, traces_sample_rate=1.0)
if os.getenv("TESTING", "false").lower() == "false" and (sentry_params := load_oss_params().SENTRY):
sentry_sdk.init(
dsn=sentry_params.DSN, environment=load_oss_params().GENERAL.environment, traces_sample_rate=1.0
)
init_sentry()
{
"GENERAL": {
"public_hostname": "https://gap.geant.org",
"isis_high_metric": 999999
"internal_hostname": "http://gso-api:9000",
"isis_high_metric": 999999,
"environment": "development"
},
"NETBOX": {
"api": "https://127.0.0.1:8000",
......@@ -119,7 +121,6 @@
"md5_password": "snmp password"
},
"SENTRY": {
"DSN": "https://sentry-dsn-url",
"environment": "development"
"DSN": "https://sentry-dsn-url"
}
}
......@@ -37,7 +37,7 @@ def _send_request(parameters: dict, callback_route: str) -> None:
params = oss.PROVISIONING_PROXY
# Build up a callback URL of the Provisioning Proxy to return its results to.
callback_url = f"{oss.GENERAL.public_hostname}{callback_route}"
callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}"
debug_msg = f"[provisioning proxy] Callback URL set to {callback_url}"
logger.debug(debug_msg)
......
......@@ -21,13 +21,24 @@ from gso.utils.shared_enums import PortNumber
logger = logging.getLogger(__name__)
class EnvironmentEnum(strEnum):
"""The different environments in which the GSO system can run."""
DEVELOPMENT = "development"
TEST = "test"
UAT = "uat"
PRODUCTION = "production"
class GeneralParams(BaseSettings):
"""General parameters for a :term:`GSO` configuration file."""
public_hostname: str
"""The hostname that :term:`GSO` is publicly served at, used for building the callback URL that the provisioning
proxy uses."""
"""The hostname that :term:`GSO` is publicly served at, used for building callback URLs for public use."""
internal_hostname: str
"""The hostname of :term:`GSO` that is for internal use, such as the provisioning proxy."""
isis_high_metric: int
environment: EnvironmentEnum
class CeleryParams(BaseSettings):
......@@ -192,20 +203,10 @@ class KentikParams(BaseSettings):
md5_password: str
class EnvironmentEnum(strEnum):
"""The different environments in which the GSO system can run."""
DEVELOPMENT = "development"
TEST = "test"
UAT = "uat"
PRODUCTION = "production"
class SentryParams(BaseSettings):
"""Settings for Sentry."""
DSN: str
environment: EnvironmentEnum
class OSSParams(BaseSettings):
......
......@@ -5,7 +5,7 @@ import json
from orchestrator.targets import Target
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflow import StepList, begin, conditional, done, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
......@@ -192,10 +192,20 @@ def validate_iptrunk() -> StepList:
* Verify the configuration on both sides of the trunk is intact.
* Check the ISIS metric of the trunk.
* Verify that TWAMP configuration is correct.
If a trunk has a Juniper router on both sides, it is considered legacy and does not require validation.
"""
skip_legacy_trunks = conditional(
lambda state: all(
side.iptrunk_side_node.vendor == Vendor.JUNIPER
for side in Iptrunk.from_subscription(state["subscription_id"]).iptrunk.iptrunk_sides
)
)
return (
begin
>> store_process_subscription(Target.SYSTEM)
>> skip_legacy_trunks(done)
>> unsync
>> verify_ipam_records
>> verify_netbox_entries
......
......@@ -18,10 +18,10 @@ def gather_failed_tasks() -> State:
@step("Send notification emails for all failed tasks")
def send_email_notifications(state: State) -> None:
"""Send out an email notification for all tasks that have failed."""
base_url = load_oss_params().GENERAL.public_hostname
general_settings = load_oss_params().GENERAL
all_alerts = ""
for failure in state["failed_tasks"]:
failed_task_url = f"{base_url}/workflows/{failure["process_id"]}"
failed_task_url = f"{general_settings.public_hostname}/workflows/{failure["process_id"]}"
failed_subscription = get_subscription_by_process_id(failure["process_id"])
all_alerts = f"{all_alerts}------\n\n"
if failed_subscription:
......@@ -36,7 +36,7 @@ def send_email_notifications(state: State) -> None:
)
send_mail(
"GAP - One or more tasks have failed!",
f"GAP {general_settings.environment} environment - One or more tasks have failed!",
(
f"Please check the following tasks in GAP which have failed.\n\n{all_alerts}------"
f"\n\nRegards, the GÉANT Automation Platform.\n\n"
......
......@@ -4,6 +4,7 @@ import pytest
from infoblox_client import objects
from gso.products.product_types.iptrunk import Iptrunk
from test.conftest import UseJuniperSide
from test.services.conftest import MockedNetboxClient
from test.workflows import (
assert_complete,
......@@ -13,6 +14,31 @@ from test.workflows import (
)
@pytest.fixture()
def trunk_validation_setup(
request,
iptrunk_subscription_factory,
juniper_router_subscription_factory,
nokia_router_subscription_factory,
iptrunk_side_subscription_factory,
):
if request.param == UseJuniperSide.SIDE_A:
# Nokia -> Juniper
side_a = iptrunk_side_subscription_factory(iptrunk_side_node=nokia_router_subscription_factory())
side_b = iptrunk_side_subscription_factory(iptrunk_side_node=juniper_router_subscription_factory())
elif request.param == UseJuniperSide.SIDE_B:
# Juniper -> Nokia
side_a = iptrunk_side_subscription_factory(iptrunk_side_node=juniper_router_subscription_factory())
side_b = iptrunk_side_subscription_factory(iptrunk_side_node=nokia_router_subscription_factory())
else:
# Nokia -> Nokia
side_a = iptrunk_side_subscription_factory(iptrunk_side_node=nokia_router_subscription_factory())
side_b = iptrunk_side_subscription_factory(iptrunk_side_node=nokia_router_subscription_factory())
subscription_id = iptrunk_subscription_factory(iptrunk_sides=[side_a, side_b])
return [{"subscription_id": subscription_id}]
@pytest.fixture()
def _mocked_netbox_client():
with (
......@@ -35,6 +61,9 @@ def _mocked_netbox_client():
yield
@pytest.mark.parametrize(
"trunk_validation_setup", [UseJuniperSide.NONE, UseJuniperSide.SIDE_A, UseJuniperSide.SIDE_B], indirect=True
)
@pytest.mark.workflow()
@pytest.mark.usefixtures("_mocked_netbox_client")
@patch("gso.services.infoblox.find_network_by_cidr")
......@@ -50,10 +79,10 @@ def test_validate_iptrunk_success(
mock_find_network_by_cidr,
faker,
data_config_filename,
iptrunk_subscription_factory,
trunk_validation_setup,
):
# Mock value setup
subscription_id = iptrunk_subscription_factory()
subscription_id = trunk_validation_setup[0]["subscription_id"]
trunk = Iptrunk.from_subscription(subscription_id).iptrunk
mock_find_network_by_cidr.side_effects = [
objects.Network(connector=None, ipv4addrs=[trunk.iptrunk_ipv4_network]),
......@@ -183,3 +212,44 @@ def test_validate_iptrunk_success(
assert mock_find_host_by_fqdn.call_count == 2
assert mock_find_v6_host_by_fqdn.call_count == 2
assert mock_find_network_by_cidr.call_count == 2
@pytest.mark.workflow()
@pytest.mark.usefixtures("_mocked_netbox_client")
@patch("gso.services.infoblox.find_network_by_cidr")
@patch("gso.services.infoblox.find_v6_host_by_fqdn")
@patch("gso.services.infoblox.find_host_by_fqdn")
@patch("gso.workflows.iptrunk.validate_iptrunk.execute_playbook")
@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device")
def test_validate_iptrunk_skip_legacy_trunks(
mock_get_interface_by_name,
mock_validate_iptrunk,
mock_find_host_by_fqdn,
mock_find_v6_host_by_fqdn,
mock_find_network_by_cidr,
faker,
data_config_filename,
iptrunk_subscription_factory,
iptrunk_side_subscription_factory,
juniper_router_subscription_factory,
):
# Mock value setup
side_a = iptrunk_side_subscription_factory(iptrunk_side_node=juniper_router_subscription_factory())
side_b = iptrunk_side_subscription_factory(iptrunk_side_node=juniper_router_subscription_factory())
subscription_id = iptrunk_subscription_factory(iptrunk_sides=[side_a, side_b])
# Run workflow
initial_router_data = [{"subscription_id": subscription_id}]
result, _, _ = run_workflow("validate_iptrunk", initial_router_data)
state = extract_state(result)
assert_complete(result)
subscription_id = state["subscription_id"]
subscription = Iptrunk.from_subscription(subscription_id)
assert subscription.status == "active"
assert mock_get_interface_by_name.call_count == 0
assert mock_validate_iptrunk.call_count == 0
assert mock_find_host_by_fqdn.call_count == 0
assert mock_find_v6_host_by_fqdn.call_count == 0
assert mock_find_network_by_cidr.call_count == 0