testing.py 6.03 KiB
import json
import logging
import os
from distutils.util import strtobool
from operator import itemgetter
from flask import Blueprint, Response, jsonify, current_app, request
from lxml import etree
from tree_format import format_tree
from inventory_provider import juniper
from inventory_provider.routes import common
from inventory_provider.tasks import worker
from inventory_provider.tasks import common as worker_common
routes = Blueprint("inventory-data-testing-support-routes", __name__)
logger = logging.getLogger(__name__)
@routes.route("flushdb", methods=['GET'])
def flushdb():
common.get_current_redis().flushdb()
return Response('OK')
# IMS routes
@routes.route("update-equipment-locations", methods=['GET'])
def update_equipment_locations_ims():
worker.update_equipment_locations.delay(use_current=True)
return Response('OK')
@routes.route("update-lg-routers", methods=['GET'])
def update_lg_routers_ims():
worker.update_lg_routers.delay(use_current=True)
return Response('OK')
@routes.route(
"update-circuit-hierarchy-and-port-id-services", methods=['GET'])
def get_circuit_hierarchy_and_port_id_services():
worker.update_circuit_hierarchy_and_port_id_services.delay()
return Response('OK')
# End of IMS routes
@routes.route("juniper-server-addresses", methods=['GET'])
@common.require_accepts_json
def juniper_addresses():
# TODO: this route (and corant, infinera routes) can be removed
r = common.get_current_redis()
routers = r.get('netdash')
assert routers # sanity: value shouldn't be empty
routers = json.loads(routers.decode('utf-8'))
return jsonify(routers)
@routes.route("bgp/<hostname>", methods=['GET'])
@common.require_accepts_json
def bgp_configs(hostname):
r = common.get_current_redis()
netconf_string = r.get('netconf:' + hostname)
if not netconf_string:
return Response(
response="no available info for '%s'" % hostname,
status=404,
mimetype="text/html")
peers = list(juniper.all_bgp_peers(
etree.XML(netconf_string.decode('utf-8'))))
if not peers:
return Response(
response="no interfaces found for '%s'" % hostname,
status=404,
mimetype="text/html")
return jsonify(peers)
@routes.route("snmp/<hostname>", methods=['GET'])
@common.require_accepts_json
def snmp_ids(hostname):
r = common.get_next_redis()
ifc_data_string = r.get('snmp-interfaces:' + hostname)
ifc_data = json.loads(ifc_data_string.decode('utf-8'))
return jsonify(ifc_data)
@routes.route("netconf/<hostname>", methods=['GET'])
def get_netconf(hostname):
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
try:
netconf_doc = juniper.load_config(
hostname, config["ssh"], validate=False)
msg = etree.tostring(netconf_doc, encoding='unicode')
except (ConnectionError, juniper.NetconfHandlingError) as e:
msg = f'error loading netconf data from {hostname}\n{e}'
return msg
@routes.route("routers-list", methods=['GET'])
def routers_from_config_dir():
with open(os.environ['FAKE_ROUTERS_FILE']) as f:
return Response(f.read(), mimetype="text.plain")
@routes.route("latchdb", methods=['GET'])
def latch_db():
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
worker_common.latch_db(config)
r = worker_common.get_current_redis(config)
return jsonify(worker_common.get_latch(r))
@routes.route("/circuit-tree/<path:root_identifier>",
methods=['GET'])
def circuit_tree(root_identifier: str):
carriers = \
strtobool(request.args.get('carriers', default='false', type=str))
interface_ = \
strtobool(request.args.get('interface', default='false', type=str))
max_depth = request.args.get('max-depth', default=0, type=int)
if carriers:
children_prop = 'carrier-circuits'
else:
children_prop = 'sub-circuits'
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
r = worker_common.get_current_redis(config)
def _get_childcircuit_tree_local(circuit_id, depth):
circuit = r.get(f'ims:circuit_hierarchy:{circuit_id}')
if not circuit:
return None
circuit = json.loads(circuit.decode('utf-8'))[0]
_tree = [
f'{circuit["id"]} -- {circuit["name"]} -- '
f'prod: {circuit["product"]} -- '
f'spd: {circuit["speed"]} -- '
f'status: {circuit["status"]}'
]
if circuit.get(children_prop, None):
if max_depth != 0 and depth >= max_depth:
_tree.append([['...', []]])
return _tree
children = []
for child_id in circuit[children_prop]:
depth += 1
v = _get_childcircuit_tree_local(child_id, depth)
if v:
children.append(v)
_tree.append(children)
else:
_tree.append([])
return _tree
if interface_:
if_services = r.get(f'ims:interface_services:{root_identifier}')
if if_services:
root_identifiers = [s['id'] for s in json.loads(if_services)]
children = []
for id_ in root_identifiers:
children.append(_get_childcircuit_tree_local(id_, 1))
tree = [root_identifier, children]
else:
return "Nothing found"
else:
try:
root_identifier = int(root_identifier)
except ValueError:
for k in r.scan_iter('ims:circuit_hierarchy:*', count=2000):
ch = r.get(k.decode('utf-8'))
details = json.loads(ch.decode('utf-8'))[0]
if root_identifier.lower() == details['name'].lower():
root_identifier = details['id']
break
else:
return f'No circuit found for: {root_identifier}'
tree = _get_childcircuit_tree_local(root_identifier, 1)
f = format_tree(
tree,
format_node=itemgetter(0),
get_children=itemgetter(1))
return f'<pre>{f}</pre>'