Skip to content
Snippets Groups Projects
Select Git revision
  • 5da5ce3abf66cf10b9b4360a70bc1099fe79eaf6
  • develop default
  • feature/NGM-41-fix-availability
  • feature/NGM-38-add-new-overlay-phisical-topology
  • NGM-30-hover-popup
  • master protected
  • svg-test
  • master-conflicted
  • 0.5
  • 0.4
  • 0.3
  • 0.2
  • 0.1
13 results

map.py

Blame
  • map.py 3.84 KiB
    from typing import Any
    
    import jsonschema
    import requests
    from fastapi import APIRouter, HTTPException
    from pydantic import BaseModel
    
    from mapping_provider import config
    from mapping_provider.backends import services
    
    router = APIRouter()
    
    
    class Pop(BaseModel):
        latitude: float | None
        longitude: float | None
        name: str
        abbreviation: str
        city: str
        country: str
    
    
    class PopList(BaseModel):
        pops: list[Pop]
    
    
    class Equipment(BaseModel):
        name: str
        pop: str
        status: str
    
    
    class EquipmentList(BaseModel):
        equipment: list[Equipment]
    
    
    INPROV_POP_LIST_SCHEMA = {
        '$schema': 'https://json-schema.org/draft/2020-12/schema',
    
        'definitions': {
            'pop': {
                'type': 'object',
                'properties': {
                    'name': {'type': 'string'},
                    'abbreviation': {'type': 'string'},
                    'city': {'type': 'string'},
                    'country': {'type': 'string'},
                    'latitude': {'type': ['number', 'null']},
                    'longitude': {'type': ['number', 'null']},
                },
                'required': ['name', 'abbreviation', 'city', 'country', 'latitude', 'longitude'],
                'additionalProperties': True,
            },
        },
    
        'type': 'array',
        'items': {'$ref': '#/definitions/pop'}
    }
    
    INPROV_EQUIPMENT_LIST_SCHEMA = {
        '$schema': 'https://json-schema.org/draft/2020-12/schema',
    
        'definitions': {
            'router': {
                'type': 'object',
                'properties': {
                    'name': {'type': 'string'},
                    'pop': {'type': 'string'},
                    'status': {'type': 'string'},
                },
                'required': ['name', 'pop', 'status'],
            },
        },
    
        'type': 'array',
        'items': {'$ref': '#/definitions/router'}
    }
    
    
    @router.get("/pops")
    def get_pops() -> PopList:
        """
        handler for /pops
        """
        # TODO: catch/handle the usual exceptions
    
        app_params = config.load()
        rv = requests.get(
            f'{app_params.inventory}/map/pops',
            headers={'Accept': 'application/json'})
        rv.raise_for_status()
        pop_list_obj = rv.json()
        jsonschema.validate(pop_list_obj, INPROV_POP_LIST_SCHEMA)
    
        def _make_pop(pop_dict: dict[str, Any]) -> Pop:
            return Pop(
                latitude=pop_dict['latitude'],
                longitude=pop_dict['longitude'],
                name=pop_dict['name'],
                abbreviation=pop_dict['abbreviation'],
                city=pop_dict['city'],
                country=pop_dict['country'],
            )
        return PopList(pops=list(map(_make_pop, pop_list_obj)))
    
    
    @router.get("/equipment")
    def get_equipment() -> EquipmentList:
        """
        handler for /equipment
        """
        # TODO: catch/handle the usual exceptions
    
        app_params = config.load()
        rv = requests.get(
            f'{app_params.inventory}/map/equipment',
            headers={'Accept': 'application/json'})
        rv.raise_for_status()
        equipment_list_obj = rv.json()
        jsonschema.validate(equipment_list_obj, INPROV_EQUIPMENT_LIST_SCHEMA)
    
        def _make_equipment(equipment_dict: dict[str, Any]) -> Equipment:
            return Equipment(
                name=equipment_dict['name'],
                pop=equipment_dict['pop'],
                status=equipment_dict['status'],
            )
        return EquipmentList(equipment=list(map(_make_equipment, equipment_list_obj)))
    
    
    @router.get("/services")
    @router.get("/services/{service_type}")
    def get_services(service_type: str | None = None) -> services.ServiceList:
        """
        handler for /trunks
        """
        return_value = services.build_service_info_list(service_type=service_type)
        if not return_value.services:
            raise HTTPException(status_code=404, detail=f'unrecognized service type: {service_type}')
        return return_value
    
    
    @router.get("/trunks")
    def get_trunks() -> services.ServiceList:
        """
        handler for /trunks, same as /services/IP TRUNK
        """
        return get_services(service_type='IP TRUNK')