Skip to content
Snippets Groups Projects
Commit 2f0eb28b authored by Release Webservice's avatar Release Webservice
Browse files

Finished release 0.18.

parents 23ff3aed 67d8681e
No related branches found
No related tags found
No related merge requests found
Showing
with 240 additions and 85 deletions
...@@ -32,3 +32,6 @@ ...@@ -32,3 +32,6 @@
0.17: UAT RC2 0.17: UAT RC2
add all interfaces to poller/interfaces response add all interfaces to poller/interfaces response
always use config file for celery startup always use config file for celery startup
0.18: UAT RC3
current/next database latching
...@@ -49,9 +49,14 @@ CONFIG_SCHEMA = { ...@@ -49,9 +49,14 @@ CONFIG_SCHEMA = {
"port": {"type": "integer"}, "port": {"type": "integer"},
"name": {"type": "string"} "name": {"type": "string"}
}, },
"required": ["hostname", "port"], "required": ["hostname", "port", "name"],
"additionalProperties": False "additionalProperties": False
}, },
"redis-databases": {
"type": "array",
"minItems": 1,
"items": {"type": "integer"}
},
"junosspace": { "junosspace": {
"api": {"type": "string"}, "api": {"type": "string"},
"username": {"type": "string"}, "username": {"type": "string"},
...@@ -64,6 +69,7 @@ CONFIG_SCHEMA = { ...@@ -64,6 +69,7 @@ CONFIG_SCHEMA = {
"ops-db", "ops-db",
"ssh", "ssh",
"redis", "redis",
"redis-databases",
"junosspace"] "junosspace"]
}, },
{ {
...@@ -71,6 +77,7 @@ CONFIG_SCHEMA = { ...@@ -71,6 +77,7 @@ CONFIG_SCHEMA = {
"ops-db", "ops-db",
"ssh", "ssh",
"sentinel", "sentinel",
"redis-databases",
"junosspace"] "junosspace"]
} }
], ],
......
...@@ -50,7 +50,7 @@ def base_interface_name(interface): ...@@ -50,7 +50,7 @@ def base_interface_name(interface):
def related_interfaces(hostname, interface): def related_interfaces(hostname, interface):
r = common.get_redis() r = common.get_current_redis()
prefix = 'netconf-interfaces:%s:' % hostname prefix = 'netconf-interfaces:%s:' % hostname
for k in r.keys(prefix + base_interface_name(interface) + '.*'): for k in r.keys(prefix + base_interface_name(interface) + '.*'):
k = k.decode('utf-8') k = k.decode('utf-8')
...@@ -63,7 +63,7 @@ def related_interfaces(hostname, interface): ...@@ -63,7 +63,7 @@ def related_interfaces(hostname, interface):
methods=['GET', 'POST']) methods=['GET', 'POST'])
@common.require_accepts_json @common.require_accepts_json
def get_juniper_link_info(source_equipment, interface): def get_juniper_link_info(source_equipment, interface):
r = common.get_redis() r = common.get_current_redis()
cache_key = 'classifier-cache:juniper:%s:%s' % ( cache_key = 'classifier-cache:juniper:%s:%s' % (
source_equipment, interface) source_equipment, interface)
...@@ -137,7 +137,7 @@ def ix_peering_info(peer_info): ...@@ -137,7 +137,7 @@ def ix_peering_info(peer_info):
protocol = type(address).__name__ protocol = type(address).__name__
keyword = description.split(' ')[0] # regex needed??? (e.g. tabs???) keyword = description.split(' ')[0] # regex needed??? (e.g. tabs???)
r = common.get_redis() r = common.get_current_redis()
for k in r.keys('ix_public_peer:*'): for k in r.keys('ix_public_peer:*'):
other = r.get(k.decode('utf-8')).decode('utf-8') other = r.get(k.decode('utf-8')).decode('utf-8')
...@@ -165,7 +165,7 @@ def find_interfaces(address): ...@@ -165,7 +165,7 @@ def find_interfaces(address):
:param address: an ipaddress object :param address: an ipaddress object
:return: :return:
""" """
r = common.get_redis() r = common.get_current_redis()
for k in r.keys('reverse_interface_addresses:*'): for k in r.keys('reverse_interface_addresses:*'):
info = r.get(k.decode('utf-8')).decode('utf-8') info = r.get(k.decode('utf-8')).decode('utf-8')
info = json.loads(info) info = json.loads(info)
...@@ -187,7 +187,7 @@ def find_interfaces_and_services(address_str): ...@@ -187,7 +187,7 @@ def find_interfaces_and_services(address_str):
raise ClassifierProcessingError( raise ClassifierProcessingError(
'unable to parse %r as an ip address' % address_str) 'unable to parse %r as an ip address' % address_str)
r = common.get_redis() r = common.get_current_redis()
for interface in find_interfaces(address): for interface in find_interfaces(address):
services = r.get( services = r.get(
...@@ -209,7 +209,7 @@ def find_interfaces_and_services(address_str): ...@@ -209,7 +209,7 @@ def find_interfaces_and_services(address_str):
@common.require_accepts_json @common.require_accepts_json
def peer_info(address): def peer_info(address):
r = common.get_redis() r = common.get_current_redis()
cache_key = 'classifier-cache:peer:%s' % address cache_key = 'classifier-cache:peer:%s' % address
...@@ -257,7 +257,7 @@ def get_trap_metadata(source_equipment, interface, circuit_id): ...@@ -257,7 +257,7 @@ def get_trap_metadata(source_equipment, interface, circuit_id):
cache_key = 'classifier-cache:infinera:%s:%s' % ( cache_key = 'classifier-cache:infinera:%s:%s' % (
source_equipment, interface) source_equipment, interface)
r = common.get_redis() r = common.get_current_redis()
result = r.get(cache_key) result = r.get(cache_key)
if result: if result:
...@@ -294,7 +294,7 @@ def get_trap_metadata(source_equipment, interface, circuit_id): ...@@ -294,7 +294,7 @@ def get_trap_metadata(source_equipment, interface, circuit_id):
methods=['GET', 'POST']) methods=['GET', 'POST'])
@common.require_accepts_json @common.require_accepts_json
def get_coriant_info(equipment_name, entity_string): def get_coriant_info(equipment_name, entity_string):
r = common.get_redis() r = common.get_current_redis()
cache_key = 'classifier-cache:coriant:%s:%s' % ( cache_key = 'classifier-cache:coriant:%s:%s' % (
equipment_name, entity_string) equipment_name, entity_string)
...@@ -306,7 +306,7 @@ def get_coriant_info(equipment_name, entity_string): ...@@ -306,7 +306,7 @@ def get_coriant_info(equipment_name, entity_string):
m = re.match(r'^(\d+\-\d+)\.(\d+)', entity_string) m = re.match(r'^(\d+\-\d+)\.(\d+)', entity_string)
if not m: if not m:
logger.warning( logger.error(
'invalid coriant entity string format: %r' % entity_string) 'invalid coriant entity string format: %r' % entity_string)
return Response( return Response(
response="no available info for '{}' '{}'".format( response="no available info for '{}' '{}'".format(
...@@ -327,6 +327,9 @@ def get_coriant_info(equipment_name, entity_string): ...@@ -327,6 +327,9 @@ def get_coriant_info(equipment_name, entity_string):
if path: if path:
result['path'] = path result['path'] = path
else:
logger.error('no path found for {}:{}'.format(
equipment_name, entity_string))
# cache this data for the next call # cache this data for the next call
result = json.dumps(result).encode('utf-8') result = json.dumps(result).encode('utf-8')
......
...@@ -2,16 +2,33 @@ import functools ...@@ -2,16 +2,33 @@ import functools
import logging import logging
from flask import request, Response, current_app, g from flask import request, Response, current_app, g
from inventory_provider.tasks.common import get_redis as tasks_get_redis from inventory_provider.tasks import common as tasks_common
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_redis(): def get_current_redis():
if 'redis_db' not in g: if 'current_redis_db' in g:
config = current_app.config['INVENTORY_PROVIDER_CONFIG'] latch = tasks_common.get_latch(g.current_redis_db)
g.redis_db = tasks_get_redis(config) if latch and latch['current'] == latch['this']:
return g.redis_db return g.current_redis_db
logger.warning('switching to current redis db')
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
g.current_redis_db = tasks_common.get_current_redis(config)
return g.current_redis_db
def get_next_redis():
if 'next_redis_db' in g:
latch = tasks_common.get_latch(g.next_redis_db)
if latch and latch['next'] == latch['this']:
return g.next_redis_db
logger.warning('switching to next redis db')
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
g.next_redis_db = tasks_common.get_next_redis(config)
return g.next_redis_db
def require_accepts_json(f): def require_accepts_json(f):
......
...@@ -18,7 +18,7 @@ def after_request(resp): ...@@ -18,7 +18,7 @@ def after_request(resp):
@routes.route("/routers", methods=['GET', 'POST']) @routes.route("/routers", methods=['GET', 'POST'])
@common.require_accepts_json @common.require_accepts_json
def routers(): def routers():
r = common.get_redis() r = common.get_current_redis()
result = [] result = []
for k in r.keys('netconf:*'): for k in r.keys('netconf:*'):
m = re.match('^netconf:(.+)$', k.decode('utf-8')) m = re.match('^netconf:(.+)$', k.decode('utf-8'))
...@@ -30,7 +30,7 @@ def routers(): ...@@ -30,7 +30,7 @@ def routers():
@routes.route("/interfaces/<hostname>", methods=['GET', 'POST']) @routes.route("/interfaces/<hostname>", methods=['GET', 'POST'])
@common.require_accepts_json @common.require_accepts_json
def router_interfaces(hostname): def router_interfaces(hostname):
r = common.get_redis() r = common.get_current_redis()
interfaces = [] interfaces = []
for k in r.keys('netconf-interfaces:%s:*' % hostname): for k in r.keys('netconf-interfaces:%s:*' % hostname):
ifc = r.get(k.decode('utf-8')) ifc = r.get(k.decode('utf-8'))
......
from flask import Blueprint, current_app, jsonify from flask import Blueprint, current_app, jsonify
from inventory_provider.tasks import worker from inventory_provider.tasks import worker
from inventory_provider.tasks import common as worker_common
from inventory_provider.routes import common from inventory_provider.routes import common
routes = Blueprint("inventory-data-job-routes", __name__) routes = Blueprint("inventory-data-job-routes", __name__)
...@@ -36,3 +37,28 @@ def reload_router_config(equipment_name): ...@@ -36,3 +37,28 @@ def reload_router_config(equipment_name):
@common.require_accepts_json @common.require_accepts_json
def check_task_status(task_id): def check_task_status(task_id):
return jsonify(worker.check_task_status(task_id)) return jsonify(worker.check_task_status(task_id))
@routes.route("latchdb", methods=['GET', 'POST'])
def latch_db():
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
db_ids = config['redis-databases']
db_ids = sorted(set(db_ids))
r = worker_common.get_next_redis(config)
latch = worker_common.get_latch(r)
if not latch:
latch = {
'current': db_ids[0],
'next': db_ids[0]
}
next_idx = db_ids.index(latch['next'])
next_idx = (next_idx + 1) % len(db_ids)
worker_common.set_latch(
config, new_current=latch['next'], new_next=db_ids[next_idx])
r = worker_common.get_current_redis(config)
return jsonify(worker_common.get_latch(r))
...@@ -16,7 +16,7 @@ def after_request(resp): ...@@ -16,7 +16,7 @@ def after_request(resp):
@routes.route('/interfaces/<hostname>', methods=['GET', 'POST']) @routes.route('/interfaces/<hostname>', methods=['GET', 'POST'])
@common.require_accepts_json @common.require_accepts_json
def poller_interface_oids(hostname): def poller_interface_oids(hostname):
r = common.get_redis() r = common.get_current_redis()
netconf_string = r.get('netconf:' + hostname) netconf_string = r.get('netconf:' + hostname)
if not netconf_string: if not netconf_string:
......
...@@ -14,7 +14,7 @@ routes = Blueprint("inventory-data-testing-support-routes", __name__) ...@@ -14,7 +14,7 @@ routes = Blueprint("inventory-data-testing-support-routes", __name__)
@routes.route("flushdb", methods=['GET', 'POST']) @routes.route("flushdb", methods=['GET', 'POST'])
def flushdb(): def flushdb():
common.get_redis().flushdb() common.get_current_redis().flushdb()
return Response('OK') return Response('OK')
...@@ -46,7 +46,7 @@ def update_interface_statuses(): ...@@ -46,7 +46,7 @@ def update_interface_statuses():
@common.require_accepts_json @common.require_accepts_json
def juniper_addresses(): def juniper_addresses():
# TODO: this route (and corant, infinera routes) can be removed # TODO: this route (and corant, infinera routes) can be removed
r = common.get_redis() r = common.get_current_redis()
routers = [] routers = []
for k in r.keys('junosspace:*'): for k in r.keys('junosspace:*'):
info = r.get(k.decode('utf-8')) info = r.get(k.decode('utf-8'))
...@@ -58,7 +58,7 @@ def juniper_addresses(): ...@@ -58,7 +58,7 @@ def juniper_addresses():
@routes.route("opsdb/interfaces") @routes.route("opsdb/interfaces")
def get_all_interface_details(): def get_all_interface_details():
r = common.get_redis() r = common.get_current_redis()
result = collections.defaultdict(list) result = collections.defaultdict(list)
for k in r.keys('opsdb:interface_services:*'): for k in r.keys('opsdb:interface_services:*'):
m = re.match( m = re.match(
...@@ -71,7 +71,7 @@ def get_all_interface_details(): ...@@ -71,7 +71,7 @@ def get_all_interface_details():
@routes.route("opsdb/interfaces/<equipment_name>") @routes.route("opsdb/interfaces/<equipment_name>")
def get_interface_details_for_equipment(equipment_name): def get_interface_details_for_equipment(equipment_name):
r = common.get_redis() r = common.get_current_redis()
result = [] result = []
for k in r.keys('opsdb:interface_services:%s:*' % equipment_name): for k in r.keys('opsdb:interface_services:%s:*' % equipment_name):
m = re.match( m = re.match(
...@@ -84,7 +84,7 @@ def get_interface_details_for_equipment(equipment_name): ...@@ -84,7 +84,7 @@ def get_interface_details_for_equipment(equipment_name):
@routes.route("opsdb/interfaces/<equipment_name>/<path:interface>") @routes.route("opsdb/interfaces/<equipment_name>/<path:interface>")
def get_interface_details(equipment_name, interface): def get_interface_details(equipment_name, interface):
r = common.get_redis() r = common.get_current_redis()
key = 'opsdb:interface_services:%s:%s' % (equipment_name, interface) key = 'opsdb:interface_services:%s:%s' % (equipment_name, interface)
# TODO: handle None (return 404) # TODO: handle None (return 404)
return jsonify(json.loads(r.get(key).decode('utf-8'))) return jsonify(json.loads(r.get(key).decode('utf-8')))
...@@ -92,7 +92,7 @@ def get_interface_details(equipment_name, interface): ...@@ -92,7 +92,7 @@ def get_interface_details(equipment_name, interface):
@routes.route("opsdb/equipment-location") @routes.route("opsdb/equipment-location")
def get_all_equipment_locations(): def get_all_equipment_locations():
r = common.get_redis() r = common.get_current_redis()
result = {} result = {}
for k in r.keys('opsdb:location:*'): for k in r.keys('opsdb:location:*'):
k = k.decode('utf-8') k = k.decode('utf-8')
...@@ -104,7 +104,7 @@ def get_all_equipment_locations(): ...@@ -104,7 +104,7 @@ def get_all_equipment_locations():
@routes.route("opsdb/equipment-location/<path:equipment_name>") @routes.route("opsdb/equipment-location/<path:equipment_name>")
def get_equipment_location(equipment_name): def get_equipment_location(equipment_name):
r = common.get_redis() r = common.get_current_redis()
result = r.get('opsdb:location:' + equipment_name) result = r.get('opsdb:location:' + equipment_name)
# TODO: handle None (return 404) # TODO: handle None (return 404)
return jsonify(json.loads(result.decode('utf-8'))) return jsonify(json.loads(result.decode('utf-8')))
...@@ -112,7 +112,7 @@ def get_equipment_location(equipment_name): ...@@ -112,7 +112,7 @@ def get_equipment_location(equipment_name):
@routes.route("opsdb/circuit-hierarchy/children/<int:parent_id>") @routes.route("opsdb/circuit-hierarchy/children/<int:parent_id>")
def get_children(parent_id): def get_children(parent_id):
r = common.get_redis() r = common.get_current_redis()
result = r.get('opsdb:services:children:%d' % parent_id) result = r.get('opsdb:services:children:%d' % parent_id)
# TODO: handle None (return 404) # TODO: handle None (return 404)
return jsonify(json.loads(result.decode('utf-8'))) return jsonify(json.loads(result.decode('utf-8')))
...@@ -120,7 +120,7 @@ def get_children(parent_id): ...@@ -120,7 +120,7 @@ def get_children(parent_id):
@routes.route("opsdb/circuit-hierarchy/parents/<int:child_id>") @routes.route("opsdb/circuit-hierarchy/parents/<int:child_id>")
def get_parents(child_id): def get_parents(child_id):
r = common.get_redis() r = common.get_current_redis()
result = r.get('opsdb:services:parents:%d' % child_id) result = r.get('opsdb:services:parents:%d' % child_id)
# TODO: handle None (return 404) # TODO: handle None (return 404)
return jsonify(json.loads(result.decode('utf-8'))) return jsonify(json.loads(result.decode('utf-8')))
...@@ -129,7 +129,7 @@ def get_parents(child_id): ...@@ -129,7 +129,7 @@ def get_parents(child_id):
@routes.route("bgp/<hostname>", methods=['GET', 'POST']) @routes.route("bgp/<hostname>", methods=['GET', 'POST'])
@common.require_accepts_json @common.require_accepts_json
def bgp_configs(hostname): def bgp_configs(hostname):
r = common.get_redis() r = common.get_current_redis()
netconf_string = r.get('netconf:' + hostname) netconf_string = r.get('netconf:' + hostname)
if not netconf_string: if not netconf_string:
return Response( return Response(
...@@ -151,7 +151,7 @@ def bgp_configs(hostname): ...@@ -151,7 +151,7 @@ def bgp_configs(hostname):
@routes.route("snmp/<hostname>", methods=['GET', 'POST']) @routes.route("snmp/<hostname>", methods=['GET', 'POST'])
@common.require_accepts_json @common.require_accepts_json
def snmp_ids(hostname): def snmp_ids(hostname):
r = common.get_redis() r = common.get_next_redis()
ifc_data_string = r.get('snmp-interfaces:' + hostname) ifc_data_string = r.get('snmp-interfaces:' + hostname)
ifc_data = json.loads(ifc_data_string.decode('utf-8')) ifc_data = json.loads(ifc_data_string.decode('utf-8'))
return jsonify(ifc_data) return jsonify(ifc_data)
......
import json
import logging
import jsonschema
import redis import redis
import redis.sentinel import redis.sentinel
logger = logging.getLogger(__name__)
DB_LATCH_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"current": {"type": "integer"},
"next": {"type": "integer"},
"this": {"type": "integer"}
},
"required": ["current", "next", "this"],
"additionalProperties": False
}
def get_latch(r):
latch = r.get('db:latch')
if latch is None:
logger.error('no latch key found in db')
return None
try:
latch = json.loads(latch.decode('utf-8'))
jsonschema.validate(latch, DB_LATCH_SCHEMA)
except (jsonschema.ValidationError, json.JSONDecodeError):
logging.exception('error validating latch value')
return None
return latch
def set_latch(config, new_current, new_next):
for db in config['redis-databases']:
latch = {
'current': new_current,
'next': new_next,
'this': db
}
r = _get_redis(config, dbid=db)
r.set('db:latch', json.dumps(latch))
def _get_redis(config, dbid=None):
if dbid is None:
logger.debug('no db specified, using minimum as first guess')
dbid = min(config['redis-databases'])
if dbid not in config['redis-databases']:
logger.error('tried to connect to unknown db id: {}'.format(dbid))
dbid = min(config['redis-databases'])
kwargs = {
'db': dbid,
'socket_timeout': 0.1
}
def get_redis(config):
if 'sentinel' in config: if 'sentinel' in config:
sentinel = redis.sentinel.Sentinel([( sentinel = redis.sentinel.Sentinel([(
config['sentinel']['hostname'], config['sentinel']['hostname'],
config['sentinel']['port'])], config['sentinel']['port'])],
socket_timeout=0.1) **kwargs)
return sentinel.master_for( return sentinel.master_for(
config['sentinel']['name'], config['sentinel']['name'],
socket_timeout=0.1) socket_timeout=0.1)
else: else:
return redis.StrictRedis( return redis.StrictRedis(
host=config['redis']['hostname'], host=config['redis']['hostname'],
port=config['redis']['port']) port=config['redis']['port'],
**kwargs)
def get_current_redis(config):
r = _get_redis(config)
latch = get_latch(r)
if not latch:
logger.warning("can't determine current db")
return r
if latch['this'] == latch['current']:
return r
else:
return _get_redis(config, latch['current'])
def get_next_redis(config):
r = _get_redis(config)
latch = get_latch(r)
if latch and latch['this'] == latch['next']:
return r
if latch and latch['next'] in config['redis-databases']:
next_id = latch['next']
else:
logger.warning("next db not configured, deriving default value")
db_ids = sorted(set(config['redis-databases']))
next_id = db_ids[0] if len(db_ids) == 1 else db_ids[1]
return _get_redis(config, next_id)
...@@ -10,7 +10,7 @@ from collections import defaultdict ...@@ -10,7 +10,7 @@ from collections import defaultdict
from lxml import etree from lxml import etree
from inventory_provider.tasks.app import app from inventory_provider.tasks.app import app
from inventory_provider.tasks.common import get_redis from inventory_provider.tasks.common import get_next_redis
from inventory_provider import config from inventory_provider import config
from inventory_provider import environment from inventory_provider import environment
from inventory_provider.db import db, opsdb from inventory_provider.db import db, opsdb
...@@ -55,38 +55,16 @@ class InventoryTask(Task): ...@@ -55,38 +55,16 @@ class InventoryTask(Task):
super().update_state(**kwargs) super().update_state(**kwargs)
def _save_value(key, value):
assert isinstance(value, str), \
"sanity failure: expected string data as value"
r = get_redis(InventoryTask.config)
r.set(name=key, value=value)
# InventoryTask.logger.debug("saved %s" % key)
return "OK"
def _save_value_json(key, data_obj):
_save_value(
key,
json.dumps(data_obj))
def _save_value_etree(key, xml_doc):
_save_value(
key,
etree.tostring(xml_doc, encoding='unicode'))
@app.task @app.task
def snmp_refresh_interfaces(hostname, community): def snmp_refresh_interfaces(hostname, community):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug( logger.debug(
'>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community)) '>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community))
_save_value_json( value = list(snmp.get_router_snmp_indexes(hostname, community))
'snmp-interfaces:' + hostname,
list(snmp.get_router_snmp_indexes( r = get_next_redis(InventoryTask.config)
hostname, r.set('snmp-interfaces:' + hostname, json.dumps(value))
community)))
logger.debug( logger.debug(
'<<< snmp_refresh_interfaces(%r, %r)' % (hostname, community)) '<<< snmp_refresh_interfaces(%r, %r)' % (hostname, community))
...@@ -97,9 +75,11 @@ def netconf_refresh_config(hostname): ...@@ -97,9 +75,11 @@ def netconf_refresh_config(hostname):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug('>>> netconf_refresh_config(%r)' % hostname) logger.debug('>>> netconf_refresh_config(%r)' % hostname)
_save_value_etree( netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"])
'netconf:' + hostname, netconf_str = etree.tostring(netconf_doc, encoding='unicode')
juniper.load_config(hostname, InventoryTask.config["ssh"]))
r = get_next_redis(InventoryTask.config)
r.set('netconf:' + hostname, netconf_str)
logger.debug('<<< netconf_refresh_config(%r)' % hostname) logger.debug('<<< netconf_refresh_config(%r)' % hostname)
...@@ -116,7 +96,7 @@ def update_interfaces_to_services(): ...@@ -116,7 +96,7 @@ def update_interfaces_to_services():
service['equipment'], service['interface_name']) service['equipment'], service['interface_name'])
interface_services[equipment_interface].append(service) interface_services[equipment_interface].append(service)
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:interface_services:*'): for key in r.scan_iter('opsdb:interface_services:*'):
r.delete(key) r.delete(key)
for equipment_interface, services in interface_services.items(): for equipment_interface, services in interface_services.items():
...@@ -132,7 +112,7 @@ def update_equipment_locations(): ...@@ -132,7 +112,7 @@ def update_equipment_locations():
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug('>>> update_equipment_locations') logger.debug('>>> update_equipment_locations')
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for k in r.scan_iter('opsdb:location:*'): for k in r.scan_iter('opsdb:location:*'):
r.delete(k) r.delete(k)
...@@ -161,7 +141,7 @@ def update_circuit_hierarchy(): ...@@ -161,7 +141,7 @@ def update_circuit_hierarchy():
parent_to_children[parent_id].append(relation) parent_to_children[parent_id].append(relation)
child_to_parents[child_id].append(relation) child_to_parents[child_id].append(relation)
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:services:parents:*'): for key in r.scan_iter('opsdb:services:parents:*'):
r.delete(key) r.delete(key)
for cid, parents in child_to_parents.items(): for cid, parents in child_to_parents.items():
...@@ -180,7 +160,7 @@ def update_geant_lambdas(): ...@@ -180,7 +160,7 @@ def update_geant_lambdas():
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug('>>> update_geant_lambdas') logger.debug('>>> update_geant_lambdas')
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:geant_lambdas:*'): for key in r.scan_iter('opsdb:geant_lambdas:*'):
r.delete(key) r.delete(key)
with db.connection(InventoryTask.config["ops-db"]) as cx: with db.connection(InventoryTask.config["ops-db"]) as cx:
...@@ -203,7 +183,7 @@ def update_junosspace_device_list(self): ...@@ -203,7 +183,7 @@ def update_junosspace_device_list(self):
'message': 'querying junosspace for managed routers' 'message': 'querying junosspace for managed routers'
}) })
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
routers = {} routers = {}
for d in juniper.load_routers_from_junosspace( for d in juniper.load_routers_from_junosspace(
...@@ -237,7 +217,7 @@ def load_netconf_data(hostname): ...@@ -237,7 +217,7 @@ def load_netconf_data(hostname):
:param hostname: :param hostname:
:return: :return:
""" """
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
netconf = r.get('netconf:' + hostname) netconf = r.get('netconf:' + hostname)
if not netconf: if not netconf:
raise InventoryTaskError('no netconf data found for %r' % hostname) raise InventoryTaskError('no netconf data found for %r' % hostname)
...@@ -252,7 +232,7 @@ def clear_cached_classifier_responses(hostname=None): ...@@ -252,7 +232,7 @@ def clear_cached_classifier_responses(hostname=None):
else: else:
logger.debug('removing all cached classifier responses') logger.debug('removing all cached classifier responses')
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
def _hostname_keys(): def _hostname_keys():
for k in r.keys('classifier-cache:juniper:%s:*' % hostname): for k in r.keys('classifier-cache:juniper:%s:*' % hostname):
...@@ -281,7 +261,7 @@ def _refresh_peers(hostname, key_base, peers): ...@@ -281,7 +261,7 @@ def _refresh_peers(hostname, key_base, peers):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.debug( logger.debug(
'removing cached %s for %r' % (key_base, hostname)) 'removing cached %s for %r' % (key_base, hostname))
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for k in r.keys(key_base + ':*'): for k in r.keys(key_base + ':*'):
# potential race condition: another proc could have # potential race condition: another proc could have
# delete this element between the time we read the # delete this element between the time we read the
...@@ -325,7 +305,7 @@ def refresh_juniper_interface_list(hostname, netconf): ...@@ -325,7 +305,7 @@ def refresh_juniper_interface_list(hostname, netconf):
logger.debug( logger.debug(
'removing cached netconf-interfaces for %r' % hostname) 'removing cached netconf-interfaces for %r' % hostname)
r = get_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for k in r.keys('netconf-interfaces:%s:*' % hostname): for k in r.keys('netconf-interfaces:%s:*' % hostname):
r.delete(k) r.delete(k)
...@@ -433,7 +413,7 @@ def reload_router_config(self, hostname): ...@@ -433,7 +413,7 @@ def reload_router_config(self, hostname):
def _derive_router_hostnames(config): def _derive_router_hostnames(config):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
r = get_redis(config) r = get_next_redis(config)
junosspace_equipment = set() junosspace_equipment = set()
for k in r.keys('junosspace:*'): for k in r.keys('junosspace:*'):
m = re.match('^junosspace:(.*)$', k.decode('utf-8')) m = re.match('^junosspace:(.*)$', k.decode('utf-8'))
......
...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages ...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name='inventory-provider', name='inventory-provider',
version="0.17", version="0.18",
author='GEANT', author='GEANT',
author_email='swd@geant.org', author_email='swd@geant.org',
description='Dashboard inventory provider', description='Dashboard inventory provider',
......
...@@ -39,6 +39,7 @@ def data_config_filename(): ...@@ -39,6 +39,7 @@ def data_config_filename():
"hostname": "xxxxxx", "hostname": "xxxxxx",
"port": 6379 "port": 6379
}, },
"redis-databases": [0, 7],
"junosspace": { "junosspace": {
"api": "bogus-url", "api": "bogus-url",
"username": "bogus-username", "username": "bogus-username",
...@@ -75,6 +76,11 @@ class MockedRedis(object): ...@@ -75,6 +76,11 @@ class MockedRedis(object):
"router-info.json") "router-info.json")
with open(test_data_filename) as f: with open(test_data_filename) as f:
MockedRedis.db = json.loads(f.read()) MockedRedis.db = json.loads(f.read())
MockedRedis.db['db:latch'] = json.dumps({
'current': 0,
'next': 0,
'this': 0
})
def set(self, name, value): def set(self, name, value):
MockedRedis.db[name] = value MockedRedis.db[name] = value
......
...@@ -3,12 +3,16 @@ just checks that the worker methods call the right functions ...@@ -3,12 +3,16 @@ just checks that the worker methods call the right functions
and some data ends up in the right place ... otherwise not very detailed and some data ends up in the right place ... otherwise not very detailed
""" """
from inventory_provider.tasks import worker from inventory_provider.tasks import worker
from inventory_provider.tasks.common import get_redis from inventory_provider.tasks.common import _get_redis
def backend_db(): def backend_db():
return get_redis({ return _get_redis({
'redis': {'hostname': None, 'port': None} 'redis': {
'hostname': None,
'port': None
},
'redis-databases': [0, 7]
}).db }).db
......
from inventory_provider.tasks import worker from inventory_provider.tasks import worker
from inventory_provider.tasks.common import get_redis from inventory_provider.tasks.common import _get_redis
def backend_db(): def backend_db():
return get_redis({ return _get_redis({
'redis': {'hostname': None, 'port': None} 'redis': {
'hostname': None,
'port': None
},
'redis-databases': [0, 7]
}).db }).db
......
...@@ -6,12 +6,16 @@ import contextlib ...@@ -6,12 +6,16 @@ import contextlib
from inventory_provider.tasks import worker from inventory_provider.tasks import worker
from inventory_provider.tasks.common import get_redis from inventory_provider.tasks.common import _get_redis
def backend_db(): def backend_db():
return get_redis({ return _get_redis({
'redis': {'hostname': None, 'port': None} 'redis': {
'hostname': None,
'port': None
},
'redis-databases': [0, 7]
}).db }).db
...@@ -20,7 +24,7 @@ def _mocked_connection(x): ...@@ -20,7 +24,7 @@ def _mocked_connection(x):
yield x yield x
def test_update_locations(mocker, mocked_worker_module): def test_update_locations(mocker, mocked_worker_module, mocked_redis):
mocker.patch( mocker.patch(
'inventory_provider.db.opsdb.lookup_pop_info', 'inventory_provider.db.opsdb.lookup_pop_info',
......
import json import json
import jsonschema import jsonschema
from inventory_provider.tasks.common import DB_LATCH_SCHEMA
DEFAULT_REQUEST_HEADERS = { DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json", "Content-type": "application/json",
"Accept": ["application/json"] "Accept": ["application/json"]
...@@ -123,3 +124,13 @@ def test_check_task_status_exception(client, mocker): ...@@ -123,3 +124,13 @@ def test_check_task_status_exception(client, mocker):
assert not status['success'] assert not status['success']
assert status['result']['error type'] == 'AssertionError' assert status['result']['error type'] == 'AssertionError'
assert status['result']['message'] == 'test error message' assert status['result']['message'] == 'test error message'
def test_latchdb(client, mocked_redis):
rv = client.post(
'jobs/latchdb',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
latch = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(latch, DB_LATCH_SCHEMA)
...@@ -47,7 +47,8 @@ def test_router_hostname_derivation(mocked_redis): ...@@ -47,7 +47,8 @@ def test_router_hostname_derivation(mocked_redis):
'redis': { 'redis': {
'hostname': None, 'hostname': None,
'port': None 'port': None
} },
'redis-databases': [0, 11]
} }
hostnames = list(worker._derive_router_hostnames(config)) hostnames = list(worker._derive_router_hostnames(config))
assert hostnames # test data is non-empty assert hostnames # test data is non-empty
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment