Skip to content
Snippets Groups Projects
Commit 52f634ae authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature improve-interface-list-performance.

parents 1aabfb64 64d72600
No related branches found
No related tags found
No related merge requests found
from collections import OrderedDict
import functools
import json
import logging
from collections import OrderedDict
import queue
import random
import threading
import requests
from flask import request, Response, current_app, g
......@@ -103,3 +107,92 @@ def after_request(response):
data,
str(response.status_code)))
return response
def _redis_client_proc(key_queue, value_queue, config_params):
"""
create a local redis connection with the current db index,
lookup the values of the keys that come from key_queue
and put them o=n value_queue
i/o contract:
None arriving on key_queue means no more keys are coming
put None in value_queue means we are finished
:param key_queue:
:param value_queue:
:param config_params: app config
:return: yields dicts like {'key': str, 'value': dict}
"""
try:
r = tasks_common.get_current_redis(config_params)
while True:
key = key_queue.get()
# contract is that None means no more requests
if not key:
break
value = r.get(key).decode('utf-8')
value_queue.put({
'key': key,
'value': json.loads(value)
})
except json.JSONDecodeError:
logger.exception(f'error decoding entry for {key}')
finally:
# contract is to return None when finished
value_queue.put(None)
def load_json_docs(config_params, key_pattern, num_threads=10):
"""
load all json docs from redis
the loading is done with multiple connections in parallel, since this
method is called from an api handler and when the client is far from
the redis master the cumulative latency causes nginx/gunicorn timeouts
:param config_params: app config
:param pattern: key pattern to load
:param num_threads: number of client threads to create
:return: yields dicts like {'key': str, 'value': dict}
"""
response_queue = queue.Queue()
threads = []
for _ in range(num_threads):
q = queue.Queue()
t = threading.Thread(
target=_redis_client_proc,
args=[q, response_queue, config_params])
t.start()
threads.append({'thread': t, 'queue': q})
r = tasks_common.get_current_redis(config_params)
# scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter(key_pattern, count=1000):
k = k.decode('utf-8')
t = random.choice(threads)
t['queue'].put(k)
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
num_finished = 0
# read values from response_queue until we receive
# None len(threads) times
while num_finished < len(threads):
value = response_queue.get()
if not value:
num_finished += 1
logger.debug('one worker thread finished')
continue
yield value
# cleanup like we're supposed to, even though it's python
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
......@@ -27,23 +27,41 @@ def routers():
return jsonify(result)
@routes.route("/interfaces", methods=['GET', 'POST'])
@routes.route("/interfaces/<hostname>", methods=['GET', 'POST'])
@common.require_accepts_json
def router_interfaces(hostname):
r = common.get_current_redis()
interfaces = []
for k in r.keys('netconf-interfaces:%s:*' % hostname):
ifc = r.get(k.decode('utf-8'))
if ifc:
interfaces.append(json.loads(ifc.decode('utf-8')))
def router_interfaces(hostname=None):
if not interfaces:
return Response(
response="no available interface info for '%s'" % hostname,
status=404,
mimetype="text/html")
cache_key = f'classifier-cache:netconf-interfaces:{hostname}' \
if hostname else 'classifier-cache:netconf-interfaces:all'
return jsonify(interfaces)
r = common.get_current_redis()
result = r.get(cache_key)
if result:
result = result.decode('utf-8')
else:
key_pattern = f'netconf-interfaces:{hostname}:*' \
if hostname else 'netconf-interfaces:*'
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
result = []
for ifc in common.load_json_docs(config, key_pattern):
key_fields = ifc['key'].split(':')
ifc['value']['router'] = key_fields[1]
result.append(ifc['value'])
if not result:
return Response(
response="no available interface info for '%s'" % hostname,
status=404,
mimetype="text/html")
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
@routes.route("/pop/<equipment_name>", methods=['GET', 'POST'])
......
......@@ -9,18 +9,19 @@
<body>
<div ng-controller="interfaces">
<h2>Interfaces</h2>
<div>
<select
ng-options="r for r in routers"
ng-change="update_interfaces()"
ng-model="router"></select>
<select
ng-options="i for i in interfaces"
ng-change="update_status()"
ng-model="interface">
</select>
</div>
<div class="column">
<p><strong>interfaces</strong></p>
<ul>
<li ng-repeat="i in interfaces">{{i.router}}:{{i.name}}
<ul>
<li>{{i.description}}</li>
<li ng-repeat="v4 in i.ipv4">v4: {{v4}}</li>
<li ng-repeat="v6 in i.ipv6">v6: {{v6}}</li>
</ul>
</li>
</ul>
<!--div class="raw">{{interfaces}}</div-->
</div>
<div>
STATUS: {{status}}
......
......@@ -12,12 +12,13 @@ myApp.controller('interfaces', function($scope, $http) {
$http({
method: 'GET',
url: window.location.origin + "/data/routers"
url: window.location.origin + "/data/interfaces"
}).then(
function(rsp) {$scope.routers = rsp.data;},
function(rsp) {$scope.interfaces = rsp.data;},
function(rsp) {$scope.routers = ['error'];}
);
/*
$scope.update_interfaces = function() {
$http({
......@@ -47,5 +48,6 @@ myApp.controller('interfaces', function($scope, $http) {
function(rsp) {$scope.interfaces = 'query error';}
);
}
*/
});
\ No newline at end of file
.column {
float: left;
width: 33.33%;
width: 100%%;
}
/* Clear floats after the columns */
......
......@@ -18,6 +18,7 @@ def test_router_interfaces(router, client):
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
"router": {"type": "string"},
"bundle": {
"type": "array",
"items": {"type": "string"}
......@@ -31,7 +32,7 @@ def test_router_interfaces(router, client):
"items": {"type": "string"}
}
},
"required": ["name", "description", "ipv4", "ipv6"],
"required": ["name", "description", "ipv4", "router", "ipv6"],
"additionalProperties": False
}
}
......
......@@ -75,3 +75,42 @@ def test_pop_not_found(client, mocker):
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 404
def test_router_interfaces_all(client):
interfaces_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
"router": {"type": "string"},
"bundle": {
"type": "array",
"items": {"type": "string"}
},
"ipv4": {
"type": "array",
"items": {"type": "string"}
},
"ipv6": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["name", "description", "ipv4", "router", "ipv6"],
"additionalProperties": False
}
}
rv = client.post(
'/data/interfaces',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, interfaces_list_schema)
assert response # at least shouldn't be empty
import json
import jsonschema
from inventory_provider.routes import common
DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json",
"Accept": ["application/json"]
......@@ -50,3 +52,46 @@ def test_version_request(client, mocked_redis):
jsonschema.validate(
json.loads(rv.data.decode("utf-8")),
version_schema)
def test_load_json_docs(data_config, mocked_redis):
INTERFACE_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"interface": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
"bundle": {
"type": "array",
"items": {"type": "string"}
},
"ipv4": {
"type": "array",
"items": {"type": "string"}
},
"ipv6": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["name", "description", "ipv4", "ipv6"],
"additionalProperties": False
}
},
"type": "object",
"properties": {
"key": {"type": "string"},
"value": {"$ref": "#/definitions/interface"}
},
"required": ["key", "value"],
"additionalProperties": False
}
for ifc in common.load_json_docs(
data_config, 'netconf-interfaces:*', num_threads=20):
jsonschema.validate(ifc, INTERFACE_SCHEMA)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment