Skip to content
Snippets Groups Projects
Commit f287f1ef authored by Erik Reid's avatar Erik Reid
Browse files

added /neteng/location/<equipment>

parent 28f3363a
No related branches found
No related tags found
No related merge requests found
"""
Neteng Support Endpoints
=========================
These endpoints are intended for use by neteng tools.
.. contents:: :local:
/neteng/location/equipment-name
---------------------------------
.. autofunction::inventory_provider.routes.neteng.get_location
"""
import json
import logging
import threading
from flask import Blueprint, Response, jsonify
from inventory_provider.routes import common
routes = Blueprint('neteng-query-routes', __name__)
logger = logging.getLogger(__name__)
_subnet_lookup_semaphore = threading.Semaphore()
@routes.after_request
def after_request(resp):
return common.after_request(resp)
@routes.route('/location/<equipment>', methods=['GET', 'POST'])
@common.require_accepts_json
def get_location(equipment):
"""
Handler for `/neteng/location/equipment-name`
This method will pop location information for the IMS node
with name = `equipment-name`.
404 is returned if the IMS node name is not known.
Otherwise the return value will be formatted as:
.. as_json::
inventory_provider.db.ims_data.NODE_LOCATION_SCHEMA
:return: as above
"""
r = common.get_current_redis()
value = r.get(f'ims:location:{equipment}')
if not value:
return Response(
response='no location information available for "{equipment}"',
status=404,
mimetype='text/html')
value = json.loads(value.decode('utf-8'))
if not value:
return Response(
response='unexpected empty cached data for "{equipment}"',
status=500,
mimetype='text/html')
return jsonify(value[0])
import json
import jsonschema
from inventory_provider.routes import common
from inventory_provider.routes.default import VERSION_SCHEMA
DEFAULT_REQUEST_HEADERS = {
'Content-type': 'application/json',
'Accept': ['application/json']
}
def test_version_request(client, mocked_redis):
rv = client.post(
'version',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
jsonschema.validate(
json.loads(rv.data.decode('utf-8')),
VERSION_SCHEMA)
def test_load_json_docs(data_config, mocked_redis):
INTERFACE_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
'interface': {
'type': 'object',
'properties': {
'logical-system': {'type': 'string'},
'name': {'type': 'string'},
'description': {'type': 'string'},
'bundle': {
'type': 'array',
'items': {'type': 'string'}
},
'ipv4': {
'type': 'array',
'items': {'type': 'string'}
},
'ipv6': {
'type': 'array',
'items': {'type': 'string'}
}
},
'required': ['name', 'description', 'ipv4', 'ipv6'],
'additionalProperties': False
}
},
'type': 'object',
'properties': {
'key': {'type': 'string'},
'value': {'$ref': '#/definitions/interface'}
},
'required': ['key', 'value'],
'additionalProperties': False
}
for ifc in common.load_json_docs(
data_config, 'netconf-interfaces:*', num_threads=20):
jsonschema.validate(ifc, INTERFACE_SCHEMA)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment