Skip to content
Snippets Groups Projects
Commit 7183f1dd authored by Neda Moeini's avatar Neda Moeini
Browse files

Merge branch 'feature/get-graph-data-from-gap' into 'master'

Feature/get graph data from gap

See merge request geant-swd/mapping-provider!1
parents 25c14572 4762a014
No related branches found
No related tags found
1 merge request!1Feature/get graph data from gap
"""Initializes the FastAPI application.""" """Create a FastAPI application for the mapping provider."""
from fastapi import FastAPI from fastapi import FastAPI
from mapping_provider.api.common import router as version_router from mapping_provider.api.common import router as version_router
from mapping_provider.api.network_graph import router as graph_router
def create_app() -> FastAPI: def create_app() -> FastAPI:
...@@ -11,5 +12,13 @@ def create_app() -> FastAPI: ...@@ -11,5 +12,13 @@ def create_app() -> FastAPI:
title="Mapping provider", title="Mapping provider",
description="Mapping provider endpoints for GÉANT maps", description="Mapping provider endpoints for GÉANT maps",
) )
# Force configuration to be loaded at startup to avoid issues with missing config
from mapping_provider.dependencies import load_config
_ = load_config()
app.include_router(version_router) app.include_router(version_router)
app.include_router(graph_router)
return app return app
from fastapi import APIRouter
from mapping_provider.dependencies import config_dep
from mapping_provider.services.gap import load_routers, load_trunks
router = APIRouter()
@router.get("/network-graph")
def get_network_graph(config: config_dep) -> dict[str, list[str] | list[list[str]]]:
"""Network graph endpoint."""
return {"routers": load_routers(config), "trunks": load_trunks(config)}
"""The main module that runs the application.""" """The main module that runs the application."""
import uvicorn
from mapping_provider import create_app from mapping_provider import create_app
app = create_app() app = create_app()
if __name__ == "__main__":
uvicorn.run("mapping_provider.app:app", host="127.0.0.1", port=8000, reload=True)
"""Configuration file for the Mapping Provider."""
import json
from typing import Any, TextIO
import jsonschema
CONFIG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"aai": {
"type": "object",
"properties": {
"discovery_endpoint_url": {"type": "string"},
"client_id": {"type": "string"},
"secret": {"type": "string"},
},
"required": ["client_id", "secret", "discovery_endpoint_url"],
"additionalProperties": False,
},
"orchestrator": {
"type": "object",
"properties": {
"url": {"type": "string"},
},
"required": ["url"],
"additionalProperties": False,
},
},
"required": ["aai", "orchestrator"],
"additionalProperties": False,
}
def load(f: TextIO) -> dict[str, Any]:
"""loads, validates and returns configuration parameters.
:param f: file-like object that produces the config file
:return:
"""
config: dict[str, Any] = json.loads(f.read())
jsonschema.validate(config, CONFIG_SCHEMA)
return config
"""FastAPI project dependencies."""
import os
from functools import lru_cache
from typing import Annotated, Any
from fastapi import Depends
from mapping_provider import config
@lru_cache
def load_config() -> dict[str, Any]:
"""Load and cache the application configuration."""
config_file = os.environ.get("CONFIG_FILE_NAME", "config.json")
with open(config_file) as f:
return config.load(f)
# Dependency for injecting config into routes/services
config_dep = Annotated[dict[str, Any], Depends(load_config)]
"""External services for the mapping provider."""
import logging
from collections.abc import Callable
from typing import Any
import requests
from requests import Session
from mapping_provider.dependencies import config_dep
logger = logging.getLogger(__name__)
GRANT_TYPE = "client_credentials"
SCOPE = "openid profile email aarc"
def get_token_endpoint(discovery_endpoint_url: str, session: Session) -> str:
"""Fetch the token endpoint URL from discovery document."""
response = session.get(discovery_endpoint_url)
response.raise_for_status()
data: dict[str, Any] = response.json()
return str(data["token_endpoint"])
def get_token(config: dict[str, Any], session: Session) -> str:
"""Retrieve an access token using client credentials flow."""
aai_config = config["aai"]
token_endpoint = get_token_endpoint(aai_config["discovery_endpoint_url"], session)
response = session.post(
token_endpoint,
data={
"grant_type": GRANT_TYPE,
"scope": SCOPE,
"client_id": aai_config["client_id"],
"client_secret": aai_config["secret"],
},
)
response.raise_for_status()
data: dict[str, Any] = response.json()
return str(data["access_token"])
def make_request(query: str, token: str, config: dict[str, Any], session: Session) -> dict[Any, Any]:
"""Make a GraphQL request to the orchestrator API."""
api_url = f"{config['orchestrator']['url']}/api/graphql"
headers = {"Authorization": f"Bearer {token}"}
response = session.post(api_url, headers=headers, json={"query": query})
response.raise_for_status()
data: dict[str, Any] = response.json()
if "errors" in data:
logger.error(f"GraphQL query returned errors: {data['errors']}. Query: {query}")
raise ValueError(f"GraphQL query errors: {data['errors']}")
return data
def extract_router(product_block_instances: list[dict[str, Any]]) -> str | None:
"""Extract router FQDNs from productBlockInstances."""
for instance in product_block_instances:
for value in instance.get("productBlockInstanceValues", []):
if value.get("field") == "routerFqdn" and value.get("value"):
return str(value["value"])
return None
def extract_trunk(product_block_instances: list[dict[str, Any]]) -> list[str] | None:
"""Extract trunks from productBlockInstances."""
fqdns = []
for instance in product_block_instances:
for value in instance.get("productBlockInstanceValues", []):
if value.get("field") == "routerFqdn" and value.get("value"):
fqdns.append(value["value"])
break
if len(fqdns) >= 2:
return [fqdns[0], fqdns[1]]
return None
def load_inventory(
config: dict[str, Any],
token: str,
tag: str,
session: Session,
extractor: Callable[[list[dict[str, Any]]], Any | None],
) -> list[Any]:
"""
Generic function to load inventory items based on tag and extractor function.
The extractor receives a list of productBlockInstances and returns parsed output.
"""
results = []
end_cursor = 0
has_next_page = True
while has_next_page:
query = f"""
query {{
subscriptions(
filterBy: {{field: "status", value: "PROVISIONING|ACTIVE"}},
first: 100,
after: {end_cursor},
query: "tag:({tag})"
) {{
pageInfo {{
hasNextPage
endCursor
}}
page {{
subscriptionId
product {{
tag
}}
productBlockInstances {{
productBlockInstanceValues
}}
}}
}}
}}
"""
data = make_request(query, token, config, session)
page_data = data.get("data", {}).get("subscriptions", {}).get("page", [])
page_info = data.get("data", {}).get("subscriptions", {}).get("pageInfo", {})
for item in page_data:
instances = item.get("productBlockInstances", [])
extracted = extractor(instances)
if extracted:
results.append(extracted)
has_next_page = page_info.get("hasNextPage", False)
end_cursor = page_info.get("endCursor", 0)
return results
def load_routers(config: config_dep) -> list[str]:
"""Load routers (nodes) from orchestrator."""
with requests.Session() as session:
token = get_token(config, session)
return load_inventory(config, token, tag="RTR", session=session, extractor=extract_router)
def load_trunks(config: config_dep) -> list[list[str]]:
"""Load trunks (edges) from orchestrator."""
with requests.Session() as session:
token = get_token(config, session)
return load_inventory(config, token, tag="IPTRUNK", session=session, extractor=extract_trunk)
[tool.ruff] [tool.ruff]
line-length = 120 line-length = 120
target-version = "py313" target-version = "py312"
select = ["E", "F", "I", "B", "UP", "N"] select = ["E", "F", "I", "B", "UP", "N"]
fixable = ["ALL"] fixable = ["ALL"]
exclude = ["tests", "docs", "build"] exclude = ["tests", "docs", "build"]
...@@ -9,4 +9,14 @@ exclude = ["tests", "docs", "build"] ...@@ -9,4 +9,14 @@ exclude = ["tests", "docs", "build"]
python_version = "3.13" python_version = "3.13"
strict = true strict = true
warn_unused_ignores = true warn_unused_ignores = true
warn_return_any = true warn_return_any = true
\ No newline at end of file allow_untyped_decorators = true
ignore_missing_imports = true
exclude = [
"venv",
"test/*",
"docs"
]
[[tool.mypy.overrides]]
module = ["requests.*"]
ignore_missing_imports = true
\ No newline at end of file
...@@ -13,7 +13,9 @@ setup( ...@@ -13,7 +13,9 @@ setup(
python_requires=">=3.10", python_requires=">=3.10",
install_requires=[ install_requires=[
"fastapi", "fastapi",
"uvicorn[standard]" "uvicorn[standard]",
"jsonschema",
"requests",
], ],
long_description=open("README.md", encoding="utf-8").read(), long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
...@@ -23,4 +25,4 @@ setup( ...@@ -23,4 +25,4 @@ setup(
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
"Operating System :: OS Independent", "Operating System :: OS Independent",
], ],
) )
\ No newline at end of file
[tox] [tox]
envlist = lint, typecheck, docs envlist = py313
[testenv:lint] [testenv]
description = Lint code with Ruff
deps = ruff
commands = ruff check mapping_provider
[testenv:typecheck]
description = Type-check code with mypy
deps = mypy
commands = mypy mapping_provider
[testenv:docs]
description = Build docs
deps = deps =
sphinx -r requirements.txt
sphinx-rtd-theme commands =
sphinxcontrib-plantuml ruff check --respect-gitignore --preview .
sphinxcontrib-drawio ruff format --respect-gitignore --preview --check .
sphinxcontrib-openapi mypy .
commands = sphinx-build -b html docs/source docs/build
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment