Skip to content
Snippets Groups Projects
gap.py 3.72 KiB
import logging

import requests
from flask import current_app

logger = logging.getLogger(__name__)

GRANT_TYPE = 'client_credentials'
SCOPE = 'openid profile email aarc'


def get_token_endpoint(discovery_endpoint_url: str) -> str:
    response = requests.get(discovery_endpoint_url)
    response.raise_for_status()
    return response.json()['token_endpoint']


def get_token(aai_config: dict) -> str:
    """Get an access token using the given configuration."""
    response = requests.post(
        get_token_endpoint(aai_config['discovery_endpoint_url']),
        data={
            'grant_type': GRANT_TYPE,
            'scope': SCOPE,
            'client_id': aai_config['inventory_provider']['client_id'],
            'client_secret': aai_config['inventory_provider']['secret']
        }
    )
    response.raise_for_status()
    return response.json()['access_token']


def make_request(body: dict) -> dict:
    """Make a request to the orchestrator using the given body."""
    config = current_app.config['INVENTORY_PROVIDER_CONFIG']
    api_url = f'{config["orchestrator"]["url"]}/api/graphql'
    headers = {'Authorization': f'Bearer {get_token(config["aai"])}'}
    response = requests.post(api_url, headers=headers, json=body)
    response.raise_for_status()
    return response.json()


def extract_router_info(device: dict) -> dict or None:
    tag_to_key_map = {
        "RTR": "router",
        "OFFICE_ROUTER": "office_router",
        "Super_POP_SWITCH": "super_pop_switch"
    }

    tag = device.get("product", {}).get("tag")
    key = tag_to_key_map.get(tag)
    subscription_id = device.get("subscriptionId")

    if key is None or subscription_id is None:
        logger.warning(f"Skipping device with invalid tag or subscription ID: {device}")
        return None

    query = f"""
    query {{
            subscriptions(
                filterBy: {{ field: "subscriptionId", value: "{subscription_id}" }}
            ) {{
                page {{
                    subscriptionId
                    productBlockInstances {{
                        productBlockInstanceValues
                    }}
                }}
            }}
        }}
        """

    response = make_request(body={'query': query})
    page_data = response.get('data', {}).get('subscriptions', {}).get('page')

    if not page_data:
        logger.warning(f"No data for subscription ID: {subscription_id}")
        return None

    instance_values = page_data[0].get('productBlockInstances', [{}])[0].get('productBlockInstanceValues', [])

    fqdn = next((item.get('value') for item in instance_values if item.get('field') == f'{key}Fqdn'), None)
    vendor = next((item.get('value') for item in instance_values if item.get('field') == 'vendor'), None)

    if fqdn and vendor:
        return {'fqdn': fqdn, 'vendor': vendor}
    else:
        logger.warning(f"Skipping device with missing FQDN or vendor: {device}")
        return None


def load_routers_from_orchestrator() -> dict:
    """Gets devices from the orchestrator and returns a dictionary of FQDNs and vendors."""

    query = """
    {
        subscriptions(
            filterBy: {field: "status", value: "ACTIVE"},
            query: "tag:(RTR|OFFICE_ROUTER|Super_POP_SWITCH)"
        ) {
            page {
                subscriptionId
                product {
                    tag
                }
            }
        }
    }
    """
    routers = {}
    response = make_request(body={'query': query})

    try:
        devices = response['data']['subscriptions']['page']
    except (TypeError, KeyError):
        devices = []

    for device in devices:
        router_info = extract_router_info(device)
        if router_info is not None:
            routers[router_info['fqdn']] = router_info['vendor']
    return routers