Skip to content
Snippets Groups Projects
Commit 2ba56a14 authored by Erik Reid's avatar Erik Reid
Browse files

initial flattening of redis structure

parent 97bf661f
Branches
Tags
No related merge requests found
...@@ -111,7 +111,6 @@ def load_config(hostname, ssh_params): ...@@ -111,7 +111,6 @@ def load_config(hostname, ssh_params):
:return: :return:
""" """
juniper_logger = logging.getLogger(JUNIPER_LOGGER_NAME) juniper_logger = logging.getLogger(JUNIPER_LOGGER_NAME)
config = _rpc(hostname, ssh_params).get_config()
juniper_logger.info("capturing netconf data for '%s'" % hostname) juniper_logger.info("capturing netconf data for '%s'" % hostname)
...@@ -124,6 +123,8 @@ def load_config(hostname, ssh_params): ...@@ -124,6 +123,8 @@ def load_config(hostname, ssh_params):
schema_doc = etree.XML(CONFIG_SCHEMA.encode('utf-8')) schema_doc = etree.XML(CONFIG_SCHEMA.encode('utf-8'))
config_schema = etree.XMLSchema(schema_doc) config_schema = etree.XMLSchema(schema_doc)
config = _rpc(hostname, ssh_params).get_config()
_validate(config_schema, config) _validate(config_schema, config)
# validate interfaces/interface/unit elements ... # validate interfaces/interface/unit elements ...
......
...@@ -12,8 +12,8 @@ routes = Blueprint("inventory-data-job-routes", __name__) ...@@ -12,8 +12,8 @@ routes = Blueprint("inventory-data-job-routes", __name__)
def update(): def update():
task_logger = logging.getLogger(TASK_LOGGER_NAME) task_logger = logging.getLogger(TASK_LOGGER_NAME)
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
for r in config["routers"]: for r in config["routers"]:
task_logger.info("fetching details for: %r" % r) task_logger.info("fetching details for: %r" % r)
...@@ -34,6 +34,10 @@ def update(): ...@@ -34,6 +34,10 @@ def update():
'inventory_provider.tasks.worker.snmp_refresh_interfaces', 'inventory_provider.tasks.worker.snmp_refresh_interfaces',
args=[r["hostname"], r["community"]]) args=[r["hostname"], r["community"]])
task_logger.debug(
'launching task: '
'inventory_provider.tasks.worker.update_inventory_system_cache')
return Response("OK") return Response("OK")
......
...@@ -29,18 +29,18 @@ class InventoryTask(Task): ...@@ -29,18 +29,18 @@ class InventoryTask(Task):
def __init__(self): def __init__(self):
pass pass
@staticmethod # @staticmethod
def save_key(hostname, key, value): # def save_key(hostname, key, value):
assert isinstance(value, str), \ # assert isinstance(value, str), \
"sanity failure: expected string data as value" # "sanity failure: expected string data as value"
r = get_redis(InventoryTask.config) # r = get_redis(InventoryTask.config)
r.hset( # r.hset(
name=hostname, # name=hostname,
key=key, # key=key,
value=value) # value=value)
InventoryTask.logger.debug( # InventoryTask.logger.debug(
"saved %s, key %s" % (hostname, key)) # "saved %s, key %s" % (hostname, key))
return "OK" # return "OK"
@staticmethod @staticmethod
def save_value(key, value): def save_value(key, value):
...@@ -60,19 +60,25 @@ class InventoryTask(Task): ...@@ -60,19 +60,25 @@ class InventoryTask(Task):
json.dumps(data_obj)) json.dumps(data_obj))
@staticmethod @staticmethod
def save_key_json(hostname, key, data_obj): def save_value_etree(key, xml_doc):
InventoryTask.save_key( InventoryTask.save_value(
hostname,
key,
json.dumps(data_obj))
@staticmethod
def save_key_etree(hostname, key, xml_doc):
InventoryTask.save_key(
hostname,
key, key,
etree.tostring(xml_doc, encoding='unicode')) etree.tostring(xml_doc, encoding='unicode'))
# @staticmethod
# def save_key_json(hostname, key, data_obj):
# InventoryTask.save_key(
# hostname,
# key,
# json.dumps(data_obj))
#
# @staticmethod
# def save_key_etree(hostname, key, xml_doc):
# InventoryTask.save_key(
# hostname,
# key,
# etree.tostring(xml_doc, encoding='unicode'))
class WorkerArgs(bootsteps.Step): class WorkerArgs(bootsteps.Step):
def __init__(self, worker, config_filename, **options): def __init__(self, worker, config_filename, **options):
...@@ -81,11 +87,11 @@ class WorkerArgs(bootsteps.Step): ...@@ -81,11 +87,11 @@ class WorkerArgs(bootsteps.Step):
InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME) InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME)
interfaces_key = "interface_services" # interfaces_key = "interface_services"
equipment_locations_key = "equipment_locations" # equipment_locations_key = "equipment_locations"
service_child_to_parents_key = "child_to_parent_circuit_relations" # service_child_to_parents_key = "child_to_parent_circuit_relations"
service_parent_to_children_key = "parent_to_children_circuit_relations" # service_parent_to_children_key = "parent_to_children_circuit_relations"
interface_status_key = "interface_statuses" # interface_status_key = "interface_statuses"
def worker_args(parser): def worker_args(parser):
...@@ -107,9 +113,8 @@ def snmp_refresh_interfaces(self, hostname, community): ...@@ -107,9 +113,8 @@ def snmp_refresh_interfaces(self, hostname, community):
logger.debug('STARTING: snmp_refresh_interfaces(%r, %r)' logger.debug('STARTING: snmp_refresh_interfaces(%r, %r)'
% (hostname, community)) % (hostname, community))
InventoryTask.save_key_json( InventoryTask.save_value_json(
hostname, 'snmp-interfaces:' + hostname,
"snmp-interfaces",
list(snmp.get_router_interfaces( list(snmp.get_router_interfaces(
hostname, hostname,
community, community,
...@@ -124,9 +129,8 @@ def netconf_refresh_config(self, hostname): ...@@ -124,9 +129,8 @@ def netconf_refresh_config(self, hostname):
logger = logging.getLogger(constants.TASK_LOGGER_NAME) logger = logging.getLogger(constants.TASK_LOGGER_NAME)
logger.debug('STARTING: netconf_refresh_config(%r)' % hostname) logger.debug('STARTING: netconf_refresh_config(%r)' % hostname)
InventoryTask.save_key_etree( InventoryTask.save_value_etree(
hostname, 'netconf:' + hostname,
"netconf",
juniper.load_config(hostname, InventoryTask.config["ssh"])) juniper.load_config(hostname, InventoryTask.config["ssh"]))
logger.debug('FINISHED: netconf_refresh_config(%r)' % hostname) logger.debug('FINISHED: netconf_refresh_config(%r)' % hostname)
...@@ -146,88 +150,69 @@ def netconf_refresh_config(self, hostname): ...@@ -146,88 +150,69 @@ def netconf_refresh_config(self, hostname):
@app.task() @app.task()
def update_interfaces_to_services(): def update_interfaces_to_services():
logger = logging.getLogger(constants.TASK_LOGGER_NAME) interface_services = defaultdict(list)
logger.error('HERE: update_interfaces_to_services') with db.connection(InventoryTask.config["ops-db"]) as cx:
for service in opsdb.get_circuits(cx):
equipment_interface = '%s:%s' % (
service['equipment'], service['interface_name'])
interface_services[equipment_interface].append(service)
r = get_redis(InventoryTask.config) r = get_redis(InventoryTask.config)
with db.connection(InventoryTask.config["ops-db"]) as cx: for key in r.scan_iter('opsdb:interface_services:*'):
services = opsdb.get_circuits(cx) r.delete(key)
for equipment_interface, services in interface_services.items():
mapped_interfaces = defaultdict(list) r.set(
for service in services: 'opsdb:interface_services:' + equipment_interface,
key = "{}::{}".format( json.dumps(services))
service["equipment"],
service["interface_name"]
)
mapped_interfaces[key].append(service)
# Puts lu services under the parent ae as well as their own interface
# eg. services on ae15.12 would be found under ae15 as well as ae15.12
if "." in service["interface_name"]:
key = "{}::{}".format(
service["equipment"],
service["interface_name"].split(".")[0]
)
mapped_interfaces[key].append(service)
r.delete(interfaces_key)
for key, value in mapped_interfaces.items():
r.hset(interfaces_key, key, json.dumps(value))
@app.task() @app.task()
def update_equipment_locations(): def update_equipment_locations():
r = get_redis(InventoryTask.config) r = get_redis(InventoryTask.config)
r.delete(equipment_locations_key) for key in r.scan_iter('opsdb:location:*'):
r.delete(key)
with db.connection(InventoryTask.config["ops-db"]) as cx: with db.connection(InventoryTask.config["ops-db"]) as cx:
for ld in opsdb.get_equipment_location_data(cx): for ld in opsdb.get_equipment_location_data(cx):
r.hset( r.set('opsdb:location:%s' % ld['equipment_name'], json.dumps(ld))
equipment_locations_key, ld["equipment_name"], json.dumps(ld))
@app.task() @app.task()
def update_circuit_hierarchy(): def update_circuit_hierarchy():
r = get_redis(InventoryTask.config)
children_to_parents = defaultdict(list)
parents_to_children = defaultdict(list)
with db.connection(InventoryTask.config["ops-db"]) as cx: with db.connection(InventoryTask.config["ops-db"]) as cx:
records = opsdb.get_circuit_hierarchy(cx) child_to_parents = defaultdict(list)
for relation in records: parent_to_children = defaultdict(list)
for relation in opsdb.get_circuit_hierarchy(cx):
parent_id = relation["parent_circuit_id"] parent_id = relation["parent_circuit_id"]
child_id = relation["child_circuit_id"] child_id = relation["child_circuit_id"]
parents_to_children[parent_id].append(relation) parent_to_children[parent_id].append(relation)
children_to_parents[child_id].append(relation) child_to_parents[child_id].append(relation)
r.delete(service_child_to_parents_key) r = get_redis(InventoryTask.config)
for child, parents in children_to_parents.items(): for key in r.scan_iter('opsdb:services:parents:*'):
r.hset(service_child_to_parents_key, child, json.dumps(parents)) r.delete(key)
for cid, parents in child_to_parents.items():
r.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
r.delete(service_parent_to_children_key) for key in r.scan_iter('opsdb:services:children:*'):
for parent, children in parents_to_children.items(): r.delete(key)
r.hset( for cid, children in child_to_parents.items():
service_parent_to_children_key, parent, json.dumps(children)) r.set('opsdb:services:children:%d' % cid, json.dumps(children))
@app.task() @app.task()
def update_interface_statuses(): def update_interface_statuses():
r = get_redis(InventoryTask.config)
with db.connection(InventoryTask.config["ops-db"]) as cx: with db.connection(InventoryTask.config["ops-db"]) as cx:
services = opsdb.get_circuits(cx) services = opsdb.get_circuits(cx)
with db.connection(InventoryTask.config["alarms-db"]) as cx: with db.connection(InventoryTask.config["alarms-db"]) as cx:
with db.cursor(cx) as csr: with db.cursor(cx) as csr:
for service in services: for service in services:
key = "{}::{}".format( key = 'alarmsdb:interface_status:%s:%s' \
% (service['equipment'], service['interface_name'])
status = alarmsdb.get_last_known_interface_status(
csr,
service["equipment"], service["equipment"],
service["interface_name"] service["interface_name"])
) InventoryTask.save_value(key, status)
if not r.hexists(interface_status_key, key):
r.hset(interface_status_key,
key,
alarmsdb.get_last_known_interface_status(
csr,
service["equipment"],
service["interface_name"]
))
@app.task() @app.task()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment