Skip to content
Snippets Groups Projects
testing.py 6.03 KiB
import json
import logging
import os
from distutils.util import strtobool
from operator import itemgetter

from flask import Blueprint, Response, jsonify, current_app, request
from lxml import etree
from tree_format import format_tree

from inventory_provider import juniper
from inventory_provider.routes import common
from inventory_provider.tasks import worker
from inventory_provider.tasks import common as worker_common

routes = Blueprint("inventory-data-testing-support-routes", __name__)

logger = logging.getLogger(__name__)


@routes.route("flushdb", methods=['GET'])
def flushdb():
    common.get_current_redis().flushdb()
    return Response('OK')


# IMS routes


@routes.route("update-equipment-locations", methods=['GET'])
def update_equipment_locations_ims():
    worker.update_equipment_locations.delay(use_current=True)
    return Response('OK')


@routes.route("update-lg-routers", methods=['GET'])
def update_lg_routers_ims():
    worker.update_lg_routers.delay(use_current=True)
    return Response('OK')


@routes.route(
    "update-circuit-hierarchy-and-port-id-services", methods=['GET'])
def get_circuit_hierarchy_and_port_id_services():
    worker.update_circuit_hierarchy_and_port_id_services.delay()
    return Response('OK')
# End of IMS routes


@routes.route("juniper-server-addresses", methods=['GET'])
@common.require_accepts_json
def juniper_addresses():
    # TODO: this route (and corant, infinera routes) can be removed
    r = common.get_current_redis()
    routers = r.get('netdash')
    assert routers  # sanity: value shouldn't be empty
    routers = json.loads(routers.decode('utf-8'))
    return jsonify(routers)


@routes.route("bgp/<hostname>", methods=['GET'])
@common.require_accepts_json
def bgp_configs(hostname):
    r = common.get_current_redis()
    netconf_string = r.get('netconf:' + hostname)
    if not netconf_string:
        return Response(
            response="no available info for '%s'" % hostname,
            status=404,
            mimetype="text/html")

    peers = list(juniper.all_bgp_peers(
        etree.XML(netconf_string.decode('utf-8'))))

    if not peers:
        return Response(
            response="no interfaces found for '%s'" % hostname,
            status=404,
            mimetype="text/html")

    return jsonify(peers)


@routes.route("snmp/<hostname>", methods=['GET'])
@common.require_accepts_json
def snmp_ids(hostname):
    r = common.get_next_redis()
    ifc_data_string = r.get('snmp-interfaces:' + hostname)
    ifc_data = json.loads(ifc_data_string.decode('utf-8'))
    return jsonify(ifc_data)


@routes.route("netconf/<hostname>", methods=['GET'])
def get_netconf(hostname):
    config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
    try:
        netconf_doc = juniper.load_config(
            hostname, config["ssh"], validate=False)
        msg = etree.tostring(netconf_doc, encoding='unicode')
    except (ConnectionError, juniper.NetconfHandlingError) as e:
        msg = f'error loading netconf data from {hostname}\n{e}'
    return msg


@routes.route("routers-list", methods=['GET'])
def routers_from_config_dir():
    with open(os.environ['FAKE_ROUTERS_FILE']) as f:
        return Response(f.read(), mimetype="text.plain")


@routes.route("latchdb", methods=['GET'])
def latch_db():
    config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
    worker_common.latch_db(config)
    r = worker_common.get_current_redis(config)
    return jsonify(worker_common.get_latch(r))


@routes.route("/circuit-tree/<path:root_identifier>",
              methods=['GET'])
def circuit_tree(root_identifier: str):
    carriers = \
        strtobool(request.args.get('carriers', default='false', type=str))
    interface_ = \
        strtobool(request.args.get('interface', default='false', type=str))
    max_depth = request.args.get('max-depth', default=0, type=int)
    if carriers:
        children_prop = 'carrier-circuits'
    else:
        children_prop = 'sub-circuits'

    config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
    r = worker_common.get_current_redis(config)

    def _get_childcircuit_tree_local(circuit_id, depth):
        circuit = r.get(f'ims:circuit_hierarchy:{circuit_id}')
        if not circuit:
            return None
        circuit = json.loads(circuit.decode('utf-8'))[0]
        _tree = [
            f'{circuit["id"]} -- {circuit["name"]} -- '
            f'prod: {circuit["product"]} -- '
            f'spd: {circuit["speed"]} -- '
            f'status: {circuit["status"]}'
        ]

        if circuit.get(children_prop, None):
            if max_depth != 0 and depth >= max_depth:
                _tree.append([['...', []]])
                return _tree

            children = []
            for child_id in circuit[children_prop]:
                depth += 1
                v = _get_childcircuit_tree_local(child_id, depth)
                if v:
                    children.append(v)
            _tree.append(children)
        else:
            _tree.append([])
        return _tree

    if interface_:

        if_services = r.get(f'ims:interface_services:{root_identifier}')
        if if_services:
            root_identifiers = [s['id'] for s in json.loads(if_services)]
            children = []
            for id_ in root_identifiers:
                children.append(_get_childcircuit_tree_local(id_, 1))

            tree = [root_identifier, children]
        else:
            return "Nothing found"
    else:
        try:
            root_identifier = int(root_identifier)
        except ValueError:
            for k in r.scan_iter('ims:circuit_hierarchy:*', count=2000):
                ch = r.get(k.decode('utf-8'))
                details = json.loads(ch.decode('utf-8'))[0]
                if root_identifier.lower() == details['name'].lower():
                    root_identifier = details['id']
                    break
            else:
                return f'No circuit found for: {root_identifier}'
        tree = _get_childcircuit_tree_local(root_identifier, 1)
    f = format_tree(
        tree,
        format_node=itemgetter(0),
        get_children=itemgetter(1))
    return f'<pre>{f}</pre>'