map.py 4.61 KiB
from typing import Any
import jsonschema
import requests
from fastapi import APIRouter
from pydantic import BaseModel
from mapping_provider import config
from mapping_provider.backends import services
router = APIRouter()
class Pop(BaseModel):
latitude: float | None
longitude: float | None
name: str
abbreviation: str
city: str
country: str
class PopList(BaseModel):
pops: list[Pop]
class Equipment(BaseModel):
name: str
pop: str
status: str
class EquipmentList(BaseModel):
equipment: list[Equipment]
INPROV_POP_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'pop': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'abbreviation': {'type': 'string'},
'city': {'type': 'string'},
'country': {'type': 'string'},
'latitude': {'type': ['number', 'null']},
'longitude': {'type': ['number', 'null']},
},
'required': ['name', 'abbreviation', 'city', 'country', 'latitude', 'longitude'],
'additionalProperties': True,
},
},
'type': 'array',
'items': {'$ref': '#/definitions/pop'}
}
INPROV_EQUIPMENT_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'router': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'pop': {'type': 'string'},
'status': {'type': 'string'},
},
'required': ['name', 'pop', 'status'],
},
},
'type': 'array',
'items': {'$ref': '#/definitions/router'}
}
# INPROV_SERVICE_LIST_SCHEMA = {
# '$schema': 'https://json-schema.org/draft/2020-12/schema',
# 'definitions': {
# 'endpoint': {
# 'type': 'object',
# 'properties': {
# 'hostname': {'type': 'string'},
# 'interface': {'type': 'string'},
# },
# },
# 'service': {
# 'type': 'object',
# 'properties': {
# 'sid': {'type': 'string'},
# 'name': {'type': 'string'},
# 'type': {'type': 'string'},
# 'endpoints': {
# 'type': 'array',
# 'items': {'$ref': '#/definitions/endpoint'},
# 'minItems': 1,
# },
# 'overlays': {
# 'type': 'object', 'properties': {
# 'speed': {'type': 'number'},
# },
# 'required': ['speed'],
# },
# },
# 'required': ['sid', 'name', 'type', 'endpoints', 'overlays'],
# },
# },
# 'type': 'array',
# 'items': {'$ref': '#/definitions/service'}
# }
INPROV_API_URL_TODO = 'https://test-inprov01.geant.org'
@router.get("/pops")
def get_pops() -> PopList:
"""
handler for /pops
"""
# TODO: catch/handle the usual exceptions
app_params = config.load()
rv = requests.get(
f'{app_params.inventory}/map/pops',
headers={'Accept': 'application/json'})
rv.raise_for_status()
pop_list_obj = rv.json()
jsonschema.validate(pop_list_obj, INPROV_POP_LIST_SCHEMA)
def _make_pop(pop_dict: dict[str, Any]) -> Pop:
return Pop(
latitude=pop_dict['latitude'],
longitude=pop_dict['longitude'],
name=pop_dict['name'],
abbreviation=pop_dict['abbreviation'],
city=pop_dict['city'],
country=pop_dict['country'],
)
return PopList(pops=map(_make_pop, pop_list_obj))
@router.get("/equipment")
def get_equipment() -> EquipmentList:
"""
handler for /equipment
"""
# TODO: catch/handle the usual exceptions
app_params = config.load()
rv = requests.get(
f'{app_params.inventory}/map/equipment',
headers={'Accept': 'application/json'})
rv.raise_for_status()
equipment_list_obj = rv.json()
jsonschema.validate(equipment_list_obj, INPROV_EQUIPMENT_LIST_SCHEMA)
def _make_equipment(equipment_dict: dict[str, Any]) -> Equipment:
return Equipment(
name=equipment_dict['name'],
pop=equipment_dict['pop'],
status=equipment_dict['status'],
)
return EquipmentList(equipment=map(_make_equipment, equipment_list_obj))
@router.get("/trunks")
def get_trunks() -> services.ServiceList:
"""
handler for /trunks
"""
return services.build_service_info_list(service_type='IP TRUNK')