Skip to content
Snippets Groups Projects
Commit ae268861 authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature poller-all-interfaces.

parents 3dbec728 e9c2b96a
No related branches found
No related tags found
No related merge requests found
...@@ -6,11 +6,14 @@ import queue ...@@ -6,11 +6,14 @@ import queue
import random import random
import threading import threading
from lxml import etree
import requests import requests
from flask import request, Response, current_app, g from flask import request, Response, current_app, g
from inventory_provider.tasks import common as tasks_common from inventory_provider.tasks import common as tasks_common
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_DECODE_TYPE_XML = 'xml'
_DECODE_TYPE_JSON = 'json'
def ims_hostname_decorator(field): def ims_hostname_decorator(field):
...@@ -109,11 +112,11 @@ def after_request(response): ...@@ -109,11 +112,11 @@ def after_request(response):
return response return response
def _redis_client_proc(key_queue, value_queue, config_params): def _redis_client_proc(key_queue, value_queue, config_params, doc_type):
""" """
create a local redis connection with the current db index, create a local redis connection with the current db index,
lookup the values of the keys that come from key_queue lookup the values of the keys that come from key_queue
and put them o=n value_queue and put them on value_queue
i/o contract: i/o contract:
None arriving on key_queue means no more keys are coming None arriving on key_queue means no more keys are coming
...@@ -122,8 +125,18 @@ def _redis_client_proc(key_queue, value_queue, config_params): ...@@ -122,8 +125,18 @@ def _redis_client_proc(key_queue, value_queue, config_params):
:param key_queue: :param key_queue:
:param value_queue: :param value_queue:
:param config_params: app config :param config_params: app config
:return: yields dicts like {'key': str, 'value': dict} :param doc_type: decoding type to do (xml or json)
:return: nothing
""" """
assert doc_type in (_DECODE_TYPE_JSON, _DECODE_TYPE_XML)
def _decode(bv):
value = bv.decode('utf-8')
if doc_type == _DECODE_TYPE_JSON:
return json.loads(value)
elif doc_type == _DECODE_TYPE_XML:
return etree.XML(value)
try: try:
r = tasks_common.get_current_redis(config_params) r = tasks_common.get_current_redis(config_params)
while True: while True:
...@@ -133,10 +146,9 @@ def _redis_client_proc(key_queue, value_queue, config_params): ...@@ -133,10 +146,9 @@ def _redis_client_proc(key_queue, value_queue, config_params):
if not key: if not key:
break break
value = r.get(key).decode('utf-8')
value_queue.put({ value_queue.put({
'key': key, 'key': key,
'value': json.loads(value) 'value': _decode(r.get(key))
}) })
except json.JSONDecodeError: except json.JSONDecodeError:
...@@ -147,9 +159,13 @@ def _redis_client_proc(key_queue, value_queue, config_params): ...@@ -147,9 +159,13 @@ def _redis_client_proc(key_queue, value_queue, config_params):
value_queue.put(None) value_queue.put(None)
def load_json_docs(config_params, key_pattern, num_threads=10): def _load_redis_docs(
config_params,
key_pattern,
num_threads=10,
doc_type=_DECODE_TYPE_JSON):
""" """
load all json docs from redis load all docs from redis and decode as `doc_type`
the loading is done with multiple connections in parallel, since this the loading is done with multiple connections in parallel, since this
method is called from an api handler and when the client is far from method is called from an api handler and when the client is far from
...@@ -158,8 +174,10 @@ def load_json_docs(config_params, key_pattern, num_threads=10): ...@@ -158,8 +174,10 @@ def load_json_docs(config_params, key_pattern, num_threads=10):
:param config_params: app config :param config_params: app config
:param pattern: key pattern to load :param pattern: key pattern to load
:param num_threads: number of client threads to create :param num_threads: number of client threads to create
:return: yields dicts like {'key': str, 'value': dict} :param doc_type: decoding type to do (xml or json)
:return: yields dicts like {'key': str, 'value': dict or xml doc}
""" """
assert doc_type in (_DECODE_TYPE_XML, _DECODE_TYPE_JSON)
response_queue = queue.Queue() response_queue = queue.Queue()
threads = [] threads = []
...@@ -167,16 +185,15 @@ def load_json_docs(config_params, key_pattern, num_threads=10): ...@@ -167,16 +185,15 @@ def load_json_docs(config_params, key_pattern, num_threads=10):
q = queue.Queue() q = queue.Queue()
t = threading.Thread( t = threading.Thread(
target=_redis_client_proc, target=_redis_client_proc,
args=[q, response_queue, config_params]) args=[q, response_queue, config_params, doc_type])
t.start() t.start()
threads.append({'thread': t, 'queue': q}) threads.append({'thread': t, 'queue': q})
r = tasks_common.get_current_redis(config_params) r = tasks_common.get_current_redis(config_params)
# scan with bigger batches, to mitigate network latency effects # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter(key_pattern, count=1000): for k in r.scan_iter(key_pattern, count=1000):
k = k.decode('utf-8')
t = random.choice(threads) t = random.choice(threads)
t['queue'].put(k) t['queue'].put(k.decode('utf-8'))
# tell all threads there are no more keys coming # tell all threads there are no more keys coming
for t in threads: for t in threads:
...@@ -196,3 +213,13 @@ def load_json_docs(config_params, key_pattern, num_threads=10): ...@@ -196,3 +213,13 @@ def load_json_docs(config_params, key_pattern, num_threads=10):
# cleanup like we're supposed to, even though it's python # cleanup like we're supposed to, even though it's python
for t in threads: for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity t['thread'].join(timeout=0.5) # timeout, for sanity
def load_json_docs(config_params, key_pattern, num_threads=10):
yield from _load_redis_docs(
config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_JSON)
def load_xml_docs(config_params, key_pattern, num_threads=10):
yield from _load_redis_docs(
config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_XML)
import json import json
import logging
import re
from flask import Blueprint, Response, jsonify, current_app from flask import Blueprint, Response, current_app
from lxml import etree
from inventory_provider import juniper from inventory_provider import juniper
from inventory_provider.routes import common from inventory_provider.routes import common
logger = logging.getLogger(__name__)
routes = Blueprint('poller-support-routes', __name__) routes = Blueprint('poller-support-routes', __name__)
...@@ -13,75 +15,151 @@ def after_request(resp): ...@@ -13,75 +15,151 @@ def after_request(resp):
return common.after_request(resp) return common.after_request(resp)
@routes.route('/interfaces/<hostname>', methods=['GET', 'POST']) def _load_snmp_indexes(hostname=None):
@common.require_accepts_json result = dict()
def poller_interface_oids(hostname): key_pattern = f'snmp-interfaces:{hostname}*' \
r = common.get_current_redis() if hostname else 'snmp-interfaces:*'
for doc in common.load_json_docs(
config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'],
key_pattern=key_pattern):
router = doc['key'][len('snmp-interfaces:'):]
interfaces = dict(
[(e['name'], e['index']) for e in doc['value']])
result[router] = interfaces
return result
def _load_interface_bundles(hostname=None):
result = dict()
key_pattern = f'netconf-interface-bundles:{hostname}:*' \
if hostname else 'netconf-interface-bundles:*'
for doc in common.load_json_docs(
config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'],
key_pattern=key_pattern,
num_threads=20):
m = re.match(r'^netconf-interface-bundles:([^:]+):(.+)', doc['key'])
assert m
router = m.group(1)
interface = m.group(2)
result.setdefault(router, dict())
result[router][interface] = doc['value']
return result
netconf_string = r.get('netconf:' + hostname) def _load_services(hostname=None):
if not netconf_string: result = dict()
return Response( key_pattern = f'opsdb:interface_services:{hostname}:*' \
response='no netconf available info for %r' % hostname, if hostname else 'opsdb:interface_services:*'
status=404,
mimetype='text/html') def _service_params(full_service_info):
return {
snmp_data_string = r.get('snmp-interfaces:' + hostname) 'id': full_service_info['id'],
if not snmp_data_string: 'name': full_service_info['name'],
return Response( 'type': full_service_info['service_type'],
response='no snmp available info for %r' % hostname, 'status': full_service_info['status']
status=404, }
mimetype='text/html')
for doc in common.load_json_docs(
snmp_indexes = {} config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'],
for ifc in json.loads(snmp_data_string.decode('utf-8')): key_pattern=key_pattern,
snmp_indexes[ifc['name']] = ifc['index'] num_threads=20):
interfaces = list(juniper.list_interfaces( m = re.match(r'^opsdb:interface_services:([^:]+):(.+)', doc['key'])
etree.XML(netconf_string.decode('utf-8')))) if not m:
logger.warning(f'can\'t parse redis service key {doc["key"]}')
if not interfaces: # there are some weird records (dtn*, dp1*)
return Response(
response='no interfaces found for %r' % hostname,
status=404,
mimetype='text/html')
result = []
for ifc in interfaces:
if not ifc['description']:
continue continue
snmp_index = snmp_indexes.get(ifc['name'], None) router = m.group(1)
if not snmp_index: interface = m.group(2)
result.setdefault(router, dict())
result[router][interface] = [_service_params(s) for s in doc['value']]
return result
def _load_interfaces(hostname):
key_pattern = f'netconf:{hostname}*' if hostname else 'netconf:*'
for doc in common.load_xml_docs(
config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'],
key_pattern=key_pattern,
num_threads=10):
router = doc['key'][len('netconf:'):]
for ifc in juniper.list_interfaces(doc['value']):
if not ifc['description']:
continue
yield {
'router': router,
'name': ifc['name'],
'bundle': ifc['bundle'],
'bundle-parents': [],
'snmp-index': -1,
'description': ifc['description'],
'circuits': []
}
def _load_poller_interfaces(hostname=None):
snmp_indexes = _load_snmp_indexes(hostname)
bundles = _load_interface_bundles(hostname)
services = _load_services(hostname)
for ifc in _load_interfaces(hostname):
router_snmp = snmp_indexes.get(ifc['router'], None)
if not router_snmp or ifc['name'] not in router_snmp:
# there's no way to poll this interface
continue continue
ifc['snmp-index'] = router_snmp[ifc['name']]
bundle_parents = r.get('netconf-interface-bundles:%s:%s' % ( router_bundle = bundles.get(ifc['router'], None)
hostname, ifc['name'].split('.')[0])) if router_bundle:
base_ifc = ifc['name'].split('.')[0]
ifc_data = { ifc['bundle-parents'] = router_bundle.get(base_ifc, [])
'name': ifc['name'],
'bundle': ifc['bundle'], router_services = services.get(ifc['router'], None)
'bundle-parents': if router_services:
json.loads(bundle_parents) if bundle_parents else [], ifc['circuits'] = router_services.get(ifc['name'], [])
'snmp-index': snmp_index,
'description': ifc['description'],
'circuits': []
}
circuits = r.get( yield ifc
'opsdb:interface_services:%s:%s' % (hostname, ifc['name']))
if circuits:
ifc_data['circuits'] = [ @routes.route("/interfaces", methods=['GET', 'POST'])
{ @routes.route('/interfaces/<hostname>', methods=['GET', 'POST'])
'id': c['id'], @common.require_accepts_json
'name': c['name'], def poller_interface_oids(hostname=None):
'type': c['service_type'],
'status': c['status'] cache_key = f'classifier-cache:poller-interfaces:{hostname}' \
} for c in json.loads(circuits.decode('utf-8')) if hostname else 'classifier-cache:poller-interfaces:all'
]
r = common.get_current_redis()
result.append(ifc_data)
result = r.get(cache_key)
return jsonify(result) if result:
result = result.decode('utf-8')
else:
result = list(_load_poller_interfaces(hostname))
if not result:
return Response(
response='no interfaces found',
status=404,
mimetype='text/html')
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
@routes.route('/services/<category>', methods=['GET', 'POST']) @routes.route('/services/<category>', methods=['GET', 'POST'])
......
...@@ -6,61 +6,63 @@ DEFAULT_REQUEST_HEADERS = { ...@@ -6,61 +6,63 @@ DEFAULT_REQUEST_HEADERS = {
"Accept": ["application/json"] "Accept": ["application/json"]
} }
INTERFACE_LIST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
def test_router_interfaces(router, client): 'definitions': {
interfaces_list_schema = { 'service': {
"$schema": "http://json-schema.org/draft-07/schema#", 'type': 'object',
'properties': {
"definitions": { 'id': {'type': 'integer'},
"circuit": { 'name': {'type': 'string'},
"type": "object", 'type': {'type': 'string'},
"properties": { 'status': {'type': 'string'},
"name": {"type": "string"}, },
"status": {"type": "string"}, 'required': ['id', 'name', 'type', 'status'],
"type": {"type": "string"}, 'additionalProperties': False
"id": {"type": "integer"}
},
"required": ["name", "status", "type", "id"],
"additionalProperties": False
}
}, },
'interface': {
"type": "array", 'type': 'object',
"items": { 'properties': {
"type": "object", 'router': {'type': 'string'},
"properties": { 'name': {'type': 'string'},
"circuits": { 'description': {'type': 'string'},
"type": "array", 'snmp-index': {
"items": {"$ref": "#/definitions/circuit"} 'type': 'integer',
'minimum': 1
}, },
"bundle": { 'bundle': {
"type": "array", 'type': 'array',
"items": {"type": "string"} 'items': {'type': 'string'}
}, },
"bundle-parents": { 'bundle-parents': {
"type": "array", 'type': 'array',
"items": {"type": "string"} 'items': {'type': 'string'}
}, },
"description": {"type": "string"}, 'circuits': {
"name": {"type": "string"}, 'type': 'array',
"snmp-index": {"type": "integer"} 'items': {'$ref': '#/definitions/service'}
}
}, },
"required": [ 'required': [
"circuits", 'router', 'name', 'description',
"bundle", 'snmp-index', 'bundle', 'bundle-parents',
"bundle-parents", 'circuits'],
"description", 'additionalProperties': False
"name", },
"snmp-index"], },
"additionalProperties": False
} 'type': 'array',
} 'items': {'$ref': '#/definitions/interface'}
}
def test_router_interfaces(router, client):
rv = client.post( rv = client.post(
"/poller/interfaces/" + router, f'/poller/interfaces/{router}',
headers=DEFAULT_REQUEST_HEADERS) headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200 assert rv.status_code == 200
response = json.loads(rv.data.decode("utf-8")) response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, interfaces_list_schema) jsonschema.validate(response, INTERFACE_LIST_SCHEMA)
assert response # at least shouldn't be empty assert response # at least shouldn't be empty
...@@ -11,7 +11,7 @@ DEFAULT_REQUEST_HEADERS = { ...@@ -11,7 +11,7 @@ DEFAULT_REQUEST_HEADERS = {
} }
INTERFACE_LIST_SCHEMA = { SCHEMA_INTERFACE_LIST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#', '$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': { 'definitions': {
...@@ -33,6 +33,57 @@ INTERFACE_LIST_SCHEMA = { ...@@ -33,6 +33,57 @@ INTERFACE_LIST_SCHEMA = {
} }
INTERFACE_LIST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
'service': {
'type': 'object',
'properties': {
'id': {'type': 'integer'},
'name': {'type': 'string'},
'type': {'type': 'string'},
'status': {'type': 'string'},
},
'required': ['id', 'name', 'type', 'status'],
'additionalProperties': False
},
'interface': {
'type': 'object',
'properties': {
'router': {'type': 'string'},
'name': {'type': 'string'},
'description': {'type': 'string'},
'snmp-index': {
'type': 'integer',
'minimum': 1
},
'bundle': {
'type': 'array',
'items': {'type': 'string'}
},
'bundle-parents': {
'type': 'array',
'items': {'type': 'string'}
},
'circuits': {
'type': 'array',
'items': {'$ref': '#/definitions/service'}
}
},
'required': [
'router', 'name', 'description',
'snmp-index', 'bundle', 'bundle-parents',
'circuits'],
'additionalProperties': False
},
},
'type': 'array',
'items': {'$ref': '#/definitions/interface'}
}
@pytest.mark.parametrize('category', ['mdvpn', 'lhcone', 'MDVpn', 'LHCONE']) @pytest.mark.parametrize('category', ['mdvpn', 'lhcone', 'MDVpn', 'LHCONE'])
def test_service_category(client, mocked_worker_module, category): def test_service_category(client, mocked_worker_module, category):
worker._build_service_category_interface_list() worker._build_service_category_interface_list()
...@@ -42,7 +93,7 @@ def test_service_category(client, mocked_worker_module, category): ...@@ -42,7 +93,7 @@ def test_service_category(client, mocked_worker_module, category):
assert rv.status_code == 200 assert rv.status_code == 200
assert rv.is_json assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8')) response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, INTERFACE_LIST_SCHEMA) jsonschema.validate(response_data, SCHEMA_INTERFACE_LIST_SCHEMA)
assert response_data, 'expected a non-empty list' assert response_data, 'expected a non-empty list'
...@@ -53,3 +104,14 @@ def test_service_category_not_found(client, mocked_worker_module, category): ...@@ -53,3 +104,14 @@ def test_service_category_not_found(client, mocked_worker_module, category):
f'/poller/services/{category}', f'/poller/services/{category}',
headers=DEFAULT_REQUEST_HEADERS) headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 404 assert rv.status_code == 404
def test_get_all_interfaces(client):
rv = client.get(
'/poller/interfaces',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, INTERFACE_LIST_SCHEMA)
assert response_data, 'expected a non-empty list'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment