Skip to content
Snippets Groups Projects
Commit 8846264d authored by Pelle Koster's avatar Pelle Koster
Browse files

Refactor inventory requests and add support for authentication

parent 0f94cd82
No related branches found
No related tags found
1 merge request!19Refactor inventory requests and add support for authentication
...@@ -5,6 +5,7 @@ from enum import Enum, auto ...@@ -5,6 +5,7 @@ from enum import Enum, auto
from functools import reduce from functools import reduce
from typing import Dict, List from typing import Dict, List
from brian_dashboard_manager.exception import BrianHTTPError
import jsonschema import jsonschema
import requests import requests
from requests.exceptions import HTTPError from requests.exceptions import HTTPError
...@@ -194,7 +195,7 @@ class BrianData: ...@@ -194,7 +195,7 @@ class BrianData:
gws_indirect_data: List[dict] gws_indirect_data: List[dict]
gws_direct_data: List[dict] gws_direct_data: List[dict]
eumetsat_multicast_data: List[dict] eumetsat_multicast_data: List[dict]
datasource_name: str datasource_name: str | None = None
def get_customer_services( def get_customer_services(
self, self,
...@@ -217,12 +218,85 @@ class BrianData: ...@@ -217,12 +218,85 @@ class BrianData:
customers[cust].append(service) customers[cust].append(service)
return customers return customers
@classmethod
def fetch_using_config(self, config: dict):
inventory_provider = HttpClient(
base_url=config["inventory_provider"],
api_token=config.get("inventory_api_key"),
)
reporting_provider = HttpClient(
base_url=config["reporting_provider"],
)
return BrianData(
services=get_services(reporting_provider),
interfaces=get_interfaces(inventory_provider),
regions=get_nren_regions(inventory_provider),
gws_direct_data=get_gws_direct(inventory_provider),
gws_indirect_data=get_gws_indirect(inventory_provider),
eumetsat_multicast_data=get_eumetsat_multicast_subscriptions(
inventory_provider
),
)
class HttpClient:
def __init__(self, base_url, api_token=None):
self.base_url = base_url
self.api_token = api_token
def get_json(self, url, schema=None):
try:
response = requests.get(
self.base_url + url, timeout=5, headers=self._get_headers()
)
response.raise_for_status()
except HTTPError as e:
raise BrianHTTPError(e.response, f"Failed to retrieve data from {url}")
result = response.json()
if schema:
jsonschema.validate(result, schema)
return result
def _get_headers(self):
if self.api_token:
return {"Authorization": f"ApiKey {self.api_token}"}
def get_interfaces(client: HttpClient):
"""
Get all interfaces that have dashboards assigned to them.
:param client: A HttpClient that contains the base url and authentication token
:return: A list of interfaces with IP information added, if present.
"""
interfaces = client.get_json("/poller/interfaces", INTERFACE_LIST_SCHEMA) or []
ip_info = _get_ip_info(client)
def enrich(interface):
router_name = interface.get("router")
router = ip_info.get(router_name) or {}
ip = router.get(interface["name"]) or {}
interface["ipv4"] = ip.get("ipv4") or []
interface["ipv6"] = ip.get("ipv6") or []
def _get_ip_info(host): if "dashboards_info" in interface:
interface["dashboards_info"] = list(
filter(lambda x: x["name"] != "", interface["dashboards_info"])
)
return interface
filtered = filter(lambda i: len(i["dashboards"]) > 0, interfaces)
return list(map(enrich, filtered))
def _get_ip_info(client: HttpClient):
""" """
Get IP information for all interfaces on all routers. Get IP information for all interfaces on all routers.
:param host: Hostname to perform the request to. :param client: A HttpClient that contains the base url and authentication token
:return: A lookup table of the form: :return: A lookup table of the form:
{ {
'router1': { 'router1': {
...@@ -279,180 +353,60 @@ def _get_ip_info(host): ...@@ -279,180 +353,60 @@ def _get_ip_info(host):
return prev return prev
try: interfaces = (
r = requests.get(f"{host}/data/interfaces", timeout=5) client.get_json("/data/interfaces", schema=ROUTER_INTERFACES_SCHEMA) or []
r.raise_for_status() )
interfaces = r.json()
except HTTPError:
logger.exception("Failed to get IP info")
interfaces = []
jsonschema.validate(interfaces, ROUTER_INTERFACES_SCHEMA)
return reduce(reduce_func, interfaces, {}) return reduce(reduce_func, interfaces, {})
def get_interfaces(host): def get_gws_direct(client: HttpClient):
"""
Get all interfaces that have dashboards assigned to them.
:param host: Hostname to perform the request to.
:return: A list of interfaces with IP information added, if present.
"""
r = requests.get(f"{host}/poller/interfaces", timeout=5)
try:
r.raise_for_status()
interfaces = r.json()
except HTTPError:
logger.exception("Failed to get interfaces")
return []
if "vlan_type" not in interfaces[0]:
# inventory-provider changes are a bit slow, so do it on this side until it's released
ports_and_vlans = defaultdict(lambda: defaultdict(list))
for ifc in interfaces:
router = ifc["router"]
name = ifc["name"]
if "." in name:
name = name.split(".")[0]
ports_and_vlans[router][name].append(ifc)
# Add interface_type to each interface
# It's used as a filter in the dashboard manager to determine which interfaces are trunks & has VLANs
for router, ifcs in ports_and_vlans.items():
for base_ifc, ifc_list in ifcs.items():
if len(ifc_list) == 1:
ifc_list[0]["vlan_type"] = VLAN_TYPES.ACCESS.name
continue
ifc_list.sort(key=lambda x: x["name"])
base = ifc_list.pop(0)
if base["name"] != base_ifc:
base["vlan_type"] = VLAN_TYPES.VLAN.name
else:
base["vlan_type"] = VLAN_TYPES.TRUNK.name
for ifc in ifc_list:
ifc["vlan_type"] = VLAN_TYPES.VLAN.name
jsonschema.validate(interfaces, INTERFACE_LIST_SCHEMA)
ip_info = _get_ip_info(host)
def enrich(interface):
router_name = interface.get("router")
router = ip_info.get(router_name)
if not router:
return interface
ip = router.get(interface["name"])
if not ip:
return interface
ipv4 = ip["ipv4"]
ipv6 = ip["ipv6"]
interface["ipv4"] = ipv4
interface["ipv6"] = ipv6
if "dashboards_info" in interface:
interface["dashboards_info"] = list(
filter(lambda x: x["name"] != "", interface["dashboards_info"])
)
return interface
filtered = filter(lambda i: len(i["dashboards"]) > 0, interfaces)
enriched = list(map(enrich, filtered))
return enriched
def get_gws_direct(host):
""" """
Get all GWS Direct data. Get all GWS Direct data.
Follows the schema defined in GWS_DIRECT_DATA_SCHEMA. Follows the schema defined in GWS_DIRECT_DATA_SCHEMA.
:param host: Hostname to perform the request to. :param client: A HttpClient that contains the base url and authentication token
:return: GWS direct data :return: GWS direct data
""" """
return client.get_json("/poller/gws/direct", GWS_DIRECT_DATA_SCHEMA)
r = requests.get(f"{host}/poller/gws/direct", timeout=5)
try:
r.raise_for_status()
interfaces = r.json()
except HTTPError:
logger.exception("Failed to get GWS direct data")
interfaces = []
jsonschema.validate(interfaces, GWS_DIRECT_DATA_SCHEMA)
return interfaces
def get_gws_indirect(host): def get_gws_indirect(client: HttpClient):
""" """
Get all GWS Indirect data. Get all GWS Indirect data.
:param host: Hostname to perform the request to. :param client: A HttpClient that contains the base url and authentication token
:return: GWS Indirect data :return: GWS Indirect data
""" """
try: return client.get_json("/poller/gws/indirect")
r = requests.get(f"{host}/poller/gws/indirect", timeout=5)
r.raise_for_status()
interfaces = r.json()
except HTTPError:
logger.exception("Failed to get GWS indirect data")
interfaces = []
return interfaces
def get_eumetsat_multicast_subscriptions(host): def get_eumetsat_multicast_subscriptions(client: HttpClient):
""" """
Get all EUMETSAT multicast subscriptions. Get all EUMETSAT multicast subscriptions.
:param host: Hostname to perform the request to. :param client: A HttpClient that contains the base url and authentication token
:return: EUMETSAT multicast subscriptions :return: EUMETSAT multicast subscriptions
""" """
try: return client.get_json(
r = requests.get(f"{host}/poller/eumetsat-multicast", timeout=5) "/poller/eumetsat-multicast", MULTICAST_SUBSCRIPTION_LIST_SCHEMA
r.raise_for_status() )
data = r.json()
except HTTPError:
logger.exception("Failed to get EUMETSAT multicast subscriptions")
data = []
jsonschema.validate(data, MULTICAST_SUBSCRIPTION_LIST_SCHEMA)
return data
def get_nren_regions(client: HttpClient):
def get_nren_regions(host):
""" """
Get all NREN regions, where specified. Get all NREN regions, where specified.
:param host: Hostname to perform the request to. :param client: A HttpClient that contains the base url and authentication token
:return: A list of NRENs and regions. :return: A list of NRENs and regions.
""" """
try: result = client.get_json("/poller/regions", NREN_REGION_LIST_SCHEMA)
r = requests.get(f"{host}/poller/regions", timeout=5) return {region["nren"]: region["region"].upper() for region in result}
r.raise_for_status()
data = r.json()
except HTTPError:
logger.exception("Failed to get NREN regions")
data = []
jsonschema.validate(data, NREN_REGION_LIST_SCHEMA)
return {region["nren"]: region["region"].upper() for region in data}
def get_services(host): def get_services(client: HttpClient):
""" """
Fetches the current service state from the Reporting Provider host Fetches the current service state from the Reporting Provider host
:param client: A HttpClient that contains the base url
""" """
try: return client.get_json("/scid/current")
r = requests.get(f'{host}/scid/current', timeout=5)
r.raise_for_status()
services = r.json()
except requests.exceptions.HTTPError:
logger.exception('Error when fetching services:')
services = []
return services
...@@ -47,14 +47,6 @@ from enum import Enum ...@@ -47,14 +47,6 @@ from enum import Enum
from typing import Dict, List, Optional, Sequence from typing import Dict, List, Optional, Sequence
from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS from brian_dashboard_manager.config import DEFAULT_ORGANIZATIONS
from brian_dashboard_manager.data import (
get_eumetsat_multicast_subscriptions,
get_gws_direct,
get_gws_indirect,
get_interfaces,
get_nren_regions,
get_services,
)
from brian_dashboard_manager.grafana.dashboard import create_dashboard from brian_dashboard_manager.grafana.dashboard import create_dashboard
from brian_dashboard_manager.grafana.dashboard import ( from brian_dashboard_manager.grafana.dashboard import (
delete_dashboard_by_uid as delete_dashboard_by_uid_, delete_dashboard_by_uid as delete_dashboard_by_uid_,
...@@ -553,22 +545,7 @@ def provision(config): ...@@ -553,22 +545,7 @@ def provision(config):
start = time.time() start = time.time()
logger.info("Fetching data...") logger.info("Fetching data...")
interfaces = get_interfaces(config["inventory_provider"]) data = BrianData.fetch_using_config(config)
services = get_services(config["reporting_provider"])
regions = get_nren_regions(config["inventory_provider"])
gws_direct = get_gws_direct(config["inventory_provider"])
gws_indirect = get_gws_indirect(config["inventory_provider"])
eumetsat = get_eumetsat_multicast_subscriptions(config["inventory_provider"])
data = BrianData(
interfaces=interfaces,
services=services,
regions=regions,
gws_direct_data=gws_direct,
gws_indirect_data=gws_indirect,
eumetsat_multicast_data=eumetsat,
datasource_name=None,
)
request = AdminRequest(**config) request = AdminRequest(**config)
to_populate = provision_organizations( to_populate = provision_organizations(
......
...@@ -6,6 +6,7 @@ import pathlib ...@@ -6,6 +6,7 @@ import pathlib
import re import re
import string import string
import threading import threading
from brian_dashboard_manager.data import VLAN_TYPES
from brian_dashboard_manager.provision.strategies import BrianData, Organization from brian_dashboard_manager.provision.strategies import BrianData, Organization
import pytest import pytest
import brian_dashboard_manager import brian_dashboard_manager
...@@ -811,8 +812,37 @@ def interfaces(): ...@@ -811,8 +812,37 @@ def interfaces():
}, },
] ]
for ifc in result: for ifc in result:
ifc.setdefault('ipv4', []) ifc.setdefault("ipv4", [])
ifc.setdefault('ipv6', []) ifc.setdefault("ipv6", [])
# The test data does not all have a "vlan_type", we should add it if it's not there
ports_and_vlans = {}
for ifc in result:
router = ifc["router"]
name = ifc["name"]
if "." in name:
name = name.split(".")[0]
ports_and_vlans.setdefault(router, {}).setdefault(name, []).append(ifc)
for router, ifcs in ports_and_vlans.items():
for base_ifc, ifc_list in ifcs.items():
if len(ifc_list) == 1:
ifc_list[0].setdefault("vlan_type", VLAN_TYPES.ACCESS.name)
continue
ifc_list.sort(key=lambda x: x["name"])
base = ifc_list.pop(0)
if base["name"] != base_ifc:
base.setdefault("vlan_type", VLAN_TYPES.VLAN.name)
else:
base.setdefault("vlan_type", VLAN_TYPES.TRUNK.name)
for ifc in ifc_list:
ifc.setdefault("vlan_type", VLAN_TYPES.VLAN.name)
return result return result
......
from brian_dashboard_manager.data import HttpClient, get_gws_direct
import responses
from responses import matchers
@responses.activate
def test_inprov_authentication():
mock = responses.get(
url="http://some.url/poller/gws/direct",
json=[],
match=[matchers.header_matcher({"Authorization": "ApiKey some-token"})],
)
client = HttpClient(base_url="http://some.url", api_token="some-token")
get_gws_direct(client)
assert mock.call_count == 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment