Skip to content
Snippets Groups Projects
Commit 73b1925a authored by Erik Reid's avatar Erik Reid
Browse files

support /poller/interfaces without hostname for all

parent 3dbec728
No related branches found
No related tags found
No related merge requests found
......@@ -6,11 +6,14 @@ import queue
import random
import threading
from lxml import etree
import requests
from flask import request, Response, current_app, g
from inventory_provider.tasks import common as tasks_common
logger = logging.getLogger(__name__)
_DECODE_TYPE_XML = 'xml'
_DECODE_TYPE_JSON = 'json'
def ims_hostname_decorator(field):
......@@ -109,11 +112,11 @@ def after_request(response):
return response
def _redis_client_proc(key_queue, value_queue, config_params):
def _redis_client_proc(key_queue, value_queue, config_params, doc_type):
"""
create a local redis connection with the current db index,
lookup the values of the keys that come from key_queue
and put them o=n value_queue
and put them on value_queue
i/o contract:
None arriving on key_queue means no more keys are coming
......@@ -122,8 +125,18 @@ def _redis_client_proc(key_queue, value_queue, config_params):
:param key_queue:
:param value_queue:
:param config_params: app config
:return: yields dicts like {'key': str, 'value': dict}
:param doc_type: decoding type to do (xml or json)
:return: nothing
"""
assert doc_type in (_DECODE_TYPE_JSON, _DECODE_TYPE_XML)
def _decode(bv):
value = bv.decode('utf-8')
if doc_type == _DECODE_TYPE_JSON:
return json.loads(value)
elif doc_type == _DECODE_TYPE_XML:
return etree.XML(value)
try:
r = tasks_common.get_current_redis(config_params)
while True:
......@@ -133,10 +146,9 @@ def _redis_client_proc(key_queue, value_queue, config_params):
if not key:
break
value = r.get(key).decode('utf-8')
value_queue.put({
'key': key,
'value': json.loads(value)
'value': _decode(r.get(key))
})
except json.JSONDecodeError:
......@@ -147,9 +159,10 @@ def _redis_client_proc(key_queue, value_queue, config_params):
value_queue.put(None)
def load_json_docs(config_params, key_pattern, num_threads=10):
def _load_redis_docs(
config_params, key_pattern, num_threads=10, doc_type=_DECODE_TYPE_JSON):
"""
load all json docs from redis
load all docs from redis and decode as `doc_type`
the loading is done with multiple connections in parallel, since this
method is called from an api handler and when the client is far from
......@@ -158,8 +171,10 @@ def load_json_docs(config_params, key_pattern, num_threads=10):
:param config_params: app config
:param pattern: key pattern to load
:param num_threads: number of client threads to create
:return: yields dicts like {'key': str, 'value': dict}
:param doc_type: decoding type to do (xml or json)
:return: yields dicts like {'key': str, 'value': dict or xml doc}
"""
assert doc_type in (_DECODE_TYPE_XML, _DECODE_TYPE_JSON)
response_queue = queue.Queue()
threads = []
......@@ -167,16 +182,15 @@ def load_json_docs(config_params, key_pattern, num_threads=10):
q = queue.Queue()
t = threading.Thread(
target=_redis_client_proc,
args=[q, response_queue, config_params])
args=[q, response_queue, config_params, doc_type])
t.start()
threads.append({'thread': t, 'queue': q})
r = tasks_common.get_current_redis(config_params)
# scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter(key_pattern, count=1000):
k = k.decode('utf-8')
t = random.choice(threads)
t['queue'].put(k)
t['queue'].put(k.decode('utf-8'))
# tell all threads there are no more keys coming
for t in threads:
......@@ -196,3 +210,13 @@ def load_json_docs(config_params, key_pattern, num_threads=10):
# cleanup like we're supposed to, even though it's python
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
def load_json_docs(config_params, key_pattern, num_threads=10):
yield from _load_redis_docs(
config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_JSON)
def load_xml_docs(config_params, key_pattern, num_threads=10):
yield from _load_redis_docs(
config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_XML)
import json
import logging
import re
from flask import Blueprint, Response, jsonify, current_app
from lxml import etree
from inventory_provider import juniper
from inventory_provider.routes import common
logger = logging.getLogger(__name__)
routes = Blueprint('poller-support-routes', __name__)
......@@ -13,75 +15,150 @@ def after_request(resp):
return common.after_request(resp)
@routes.route('/interfaces/<hostname>', methods=['GET', 'POST'])
@common.require_accepts_json
def poller_interface_oids(hostname):
r = common.get_current_redis()
def _load_snmp_indexes(hostname=None):
result = dict()
key_pattern = f'snmp-interfaces:{hostname}' \
if hostname else 'snmp-interfaces:*'
for doc in common.load_json_docs(
config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'],
key_pattern=key_pattern):
router = doc['key'][len('snmp-interfaces:'):]
interfaces = dict(
[(e['name'], e['index']) for e in doc['value']])
result[router] = interfaces
return result
def _load_interface_bundles(hostname=None):
result = dict()
key_pattern = f'netconf-interface-bundles:{hostname}:*' \
if hostname else 'netconf-interface-bundles:*'
for doc in common.load_json_docs(
config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'],
key_pattern=key_pattern,
num_threads=20):
m = re.match(r'^netconf-interface-bundles:([^:]+):(.+)', doc['key'])
assert m
router = m.group(1)
interface = m.group(2)
result.setdefault(router, dict())
result[router][interface] = doc['value']
return result
netconf_string = r.get('netconf:' + hostname)
if not netconf_string:
return Response(
response='no netconf available info for %r' % hostname,
status=404,
mimetype='text/html')
snmp_data_string = r.get('snmp-interfaces:' + hostname)
if not snmp_data_string:
return Response(
response='no snmp available info for %r' % hostname,
status=404,
mimetype='text/html')
snmp_indexes = {}
for ifc in json.loads(snmp_data_string.decode('utf-8')):
snmp_indexes[ifc['name']] = ifc['index']
interfaces = list(juniper.list_interfaces(
etree.XML(netconf_string.decode('utf-8'))))
if not interfaces:
return Response(
response='no interfaces found for %r' % hostname,
status=404,
mimetype='text/html')
result = []
for ifc in interfaces:
if not ifc['description']:
def _load_services(hostname=None):
result = dict()
key_pattern = f'opsdb:interface_services:{hostname}:*' \
if hostname else 'opsdb:interface_services:*'
def _service_params(full_service_info):
return {
'id': full_service_info['id'],
'name': full_service_info['name'],
'type': full_service_info['service_type'],
'status': full_service_info['status']
}
for doc in common.load_json_docs(
config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'],
key_pattern=key_pattern,
num_threads=20):
m = re.match(r'^opsdb:interface_services:([^:]+):(.+)', doc['key'])
if not m:
logger.warning(f'can''t parse redis service key {doc["key"]}')
# there are some weird records (dtn*, dp1*)
continue
snmp_index = snmp_indexes.get(ifc['name'], None)
if not snmp_index:
router = m.group(1)
interface = m.group(2)
result.setdefault(router, dict())
result[router][interface] = [_service_params(s) for s in doc['value']]
return result
def _load_interfaces(hostname):
key_pattern = f'netconf:{hostname}' if hostname else 'netconf:*'
for doc in common.load_xml_docs(
config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'],
key_pattern=key_pattern,
num_threads=10):
router = doc['key'][len('netconf:'):]
for ifc in juniper.list_interfaces(doc['value']):
if not ifc['description']:
continue
yield {
'router': router,
'name': ifc['name'],
'bundle': ifc['bundle'],
'bundle-parents': [],
'snmp-index': -1,
'description': ifc['description'],
'circuits': []
}
def _load_poller_interfaces(hostname=None):
snmp_indexes = _load_snmp_indexes(hostname)
bundles = _load_interface_bundles(hostname)
services= _load_services(hostname)
for ifc in _load_interfaces(hostname):
router_snmp = snmp_indexes.get(ifc['router'], None)
if not router_snmp or ifc['name'] not in router_snmp:
# there's no way to poll this interface
continue
ifc['snmp-index'] = router_snmp[ifc['name']]
bundle_parents = r.get('netconf-interface-bundles:%s:%s' % (
hostname, ifc['name'].split('.')[0]))
ifc_data = {
'name': ifc['name'],
'bundle': ifc['bundle'],
'bundle-parents':
json.loads(bundle_parents) if bundle_parents else [],
'snmp-index': snmp_index,
'description': ifc['description'],
'circuits': []
}
router_bundle = bundles.get(ifc['router'], None)
if router_bundle:
base_ifc = ifc['name'].split('.')[0]
ifc['bundle-parents'] = router_bundle.get(base_ifc, [])
router_services = services.get(ifc['router'], None)
if router_services:
ifc['circuits'] = router_services.get(ifc['name'], [])
yield ifc
@routes.route("/interfaces", methods=['GET', 'POST'])
@routes.route('/interfaces/<hostname>', methods=['GET', 'POST'])
@common.require_accepts_json
def poller_interface_oids(hostname=None):
cache_key = f'classifier-cache:poller-interfaces:{hostname}' \
if hostname else 'classifier-cache:poller-interfaces:all'
r = common.get_current_redis()
result = r.get(cache_key)
if result:
result = result.decode('utf-8')
else:
result = list(_load_poller_interfaces(hostname))
if not result:
return Response(
response='no interfaces found',
status=404,
mimetype='text/html')
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
circuits = r.get(
'opsdb:interface_services:%s:%s' % (hostname, ifc['name']))
if circuits:
ifc_data['circuits'] = [
{
'id': c['id'],
'name': c['name'],
'type': c['service_type'],
'status': c['status']
} for c in json.loads(circuits.decode('utf-8'))
]
result.append(ifc_data)
return jsonify(result)
return Response(result, mimetype="application/json")
@routes.route('/services/<category>', methods=['GET', 'POST'])
......
......@@ -53,3 +53,10 @@ def test_service_category_not_found(client, mocked_worker_module, category):
f'/poller/services/{category}',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 404
def test_get_all_interfaces(client):
rv = client.get(
f'/poller/interfaces',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment