map.py 3.84 KiB
from typing import Any
import jsonschema
import requests
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from mapping_provider import config
from mapping_provider.backends import services
router = APIRouter()
class Pop(BaseModel):
latitude: float | None
longitude: float | None
name: str
abbreviation: str
city: str
country: str
class PopList(BaseModel):
pops: list[Pop]
class Equipment(BaseModel):
name: str
pop: str
status: str
class EquipmentList(BaseModel):
equipment: list[Equipment]
INPROV_POP_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'pop': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'abbreviation': {'type': 'string'},
'city': {'type': 'string'},
'country': {'type': 'string'},
'latitude': {'type': ['number', 'null']},
'longitude': {'type': ['number', 'null']},
},
'required': ['name', 'abbreviation', 'city', 'country', 'latitude', 'longitude'],
'additionalProperties': True,
},
},
'type': 'array',
'items': {'$ref': '#/definitions/pop'}
}
INPROV_EQUIPMENT_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'router': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'pop': {'type': 'string'},
'status': {'type': 'string'},
},
'required': ['name', 'pop', 'status'],
},
},
'type': 'array',
'items': {'$ref': '#/definitions/router'}
}
@router.get("/pops")
def get_pops() -> PopList:
"""
handler for /pops
"""
# TODO: catch/handle the usual exceptions
app_params = config.load()
rv = requests.get(
f'{app_params.inventory}/map/pops',
headers={'Accept': 'application/json'})
rv.raise_for_status()
pop_list_obj = rv.json()
jsonschema.validate(pop_list_obj, INPROV_POP_LIST_SCHEMA)
def _make_pop(pop_dict: dict[str, Any]) -> Pop:
return Pop(
latitude=pop_dict['latitude'],
longitude=pop_dict['longitude'],
name=pop_dict['name'],
abbreviation=pop_dict['abbreviation'],
city=pop_dict['city'],
country=pop_dict['country'],
)
return PopList(pops=list(map(_make_pop, pop_list_obj)))
@router.get("/equipment")
def get_equipment() -> EquipmentList:
"""
handler for /equipment
"""
# TODO: catch/handle the usual exceptions
app_params = config.load()
rv = requests.get(
f'{app_params.inventory}/map/equipment',
headers={'Accept': 'application/json'})
rv.raise_for_status()
equipment_list_obj = rv.json()
jsonschema.validate(equipment_list_obj, INPROV_EQUIPMENT_LIST_SCHEMA)
def _make_equipment(equipment_dict: dict[str, Any]) -> Equipment:
return Equipment(
name=equipment_dict['name'],
pop=equipment_dict['pop'],
status=equipment_dict['status'],
)
return EquipmentList(equipment=list(map(_make_equipment, equipment_list_obj)))
@router.get("/services")
@router.get("/services/{service_type}")
def get_services(service_type: str | None = None) -> services.ServiceList:
"""
handler for /trunks
"""
return_value = services.build_service_info_list(service_type=service_type)
if not return_value.services:
raise HTTPException(status_code=404, detail=f'unrecognized service type: {service_type}')
return return_value
@router.get("/trunks")
def get_trunks() -> services.ServiceList:
"""
handler for /trunks, same as /services/IP TRUNK
"""
return get_services(service_type='IP TRUNK')