classifier.py 11.26 KiB
import ipaddress
import json
import logging
import re
from flask import Blueprint, Response, current_app
from inventory_provider.routes import common
from inventory_provider.db import opsdb, db
routes = Blueprint("inventory-data-classifier-support-routes", __name__)
logger = logging.getLogger(__name__)
class ClassifierRequestError(Exception):
status_code = 500
def __init__(self):
super().__init__()
self.message = "Unclassified Internal Error"
class ClassifierProcessingError(ClassifierRequestError):
status_code = 422
def __init__(self, message, status_code=None):
super().__init__()
self.message = str(message)
if status_code is not None:
self.status_code = status_code
@routes.errorhandler(ClassifierRequestError)
def handle_request_error(error):
return Response(
response=error.message,
status=error.status_code)
@routes.after_request
def after_request(resp):
return common.after_request(resp)
def related_interfaces(hostname, interface):
r = common.get_current_redis()
prefix = 'netconf-interfaces:%s:' % hostname
for k in r.keys(prefix + interface + '.*'):
k = k.decode('utf-8')
assert k.startswith(prefix) # sanity
assert len(k) > len(prefix) # sanity (contains at least an interface)
yield k[len(prefix):]
def get_top_level_services(circuit_id, r):
tls = []
results = r.get("opsdb:services:children:{}".format(circuit_id))
if results:
results = json.loads(results.decode('utf-8'))
for c in results:
temp_parents = \
get_top_level_services(c['parent_circuit_id'], r)
if not temp_parents:
tls.append(
{'name': c['parent_circuit'],
'status': c['parent_circuit_status'],
'circuit_type': c['parent_circuit_type'].lower()})
tls.extend(temp_parents)
return tls
@routes.route("/juniper-link-info/<source_equipment>/<path:interface>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_juniper_link_info(source_equipment, interface):
r = common.get_current_redis()
cache_key = 'classifier-cache:juniper:%s:%s' % (
source_equipment, interface)
result = r.get(cache_key)
if result:
result = result.decode('utf-8')
else:
result = {}
top_level_services = []
services = r.get(
'opsdb:interface_services:%s:%s' % (source_equipment, interface))
if services:
result['services'] = json.loads(services.decode('utf=8'))
for s in result['services']:
top_level_services.extend(get_top_level_services(s['id'], r))
ifc_info = r.get(
'netconf-interfaces:%s:%s' % (source_equipment, interface))
if ifc_info:
result['interface'] = json.loads(ifc_info.decode('utf-8'))
else:
# warning: this should match the structure returned by
# juniper:list_interfaces:_ifc_info
result['interface'] = {
'name': interface,
'description': '',
'bundle': [],
'ipv4': [],
'ipv6': []
}
def _related_services():
for related in related_interfaces(source_equipment, interface):
rs = r.get('opsdb:interface_services:%s:%s'
% (source_equipment, related))
all_rs = []
if rs:
for s in json.loads(rs.decode('utf-8')):
top_level_services.extend(
get_top_level_services(s['id'], r))
all_rs.append(
{
'name': s['name'],
'status': s['status'],
'circuit_type': s['circuit_type']
})
return all_rs
related_services = _related_services()
if related_services:
top_level_services.extend(related_services)
if top_level_services:
result['related-services'] = top_level_services
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
def ix_peering_info(peer_info):
"""
TODO: this is probably the least efficient way of doing this
(if it's a problem, pre-compute these lists)
:param peer_info: an element from ix_public_peer:address
:return:
"""
result = {
'peer': peer_info,
'group': [],
'router': []
}
try:
address = ipaddress.ip_address(peer_info['name'])
except ValueError:
raise ClassifierProcessingError(
'unable to parse %r as an ip address' % address)
description = peer_info['description']
assert description is not None # sanity
keyword = description.split(' ')[0] # regex needed??? (e.g. tabs???)
r = common.get_current_redis()
for k in r.keys('ix_public_peer:*'):
other = r.get(k.decode('utf-8')).decode('utf-8')
other = json.loads(other)
if other['router'] == peer_info['router']:
result['router'].append(other['name'])
assert other['description'] is not None # sanity: as above...
if other['description'].startswith(keyword):
result['group'].append(other['name'])
return result
def find_interfaces(address):
"""
TODO: this is probably the least efficient way of doing this
(if it's a problem, pre-compute these lists)
:param address: an ipaddress object
:return:
"""
r = common.get_current_redis()
for k in r.keys('subnets:*'):
k = k.decode('utf-8')
m = re.match(r'^subnets:(.*)$', k)
assert m, 'sanity failure: redis returned an invalid key name'
interface = ipaddress.ip_interface(m.group(1))
if address in interface.network:
info = r.get(k).decode('utf-8')
info = json.loads(info)
for ifc in info:
yield ifc
def find_interfaces_and_services(address_str):
"""
:param address_str: an ipaddress object
:return:
"""
try:
address = ipaddress.ip_address(address_str)
except ValueError:
raise ClassifierProcessingError(
'unable to parse %r as an ip address' % address_str)
r = common.get_current_redis()
for interface in find_interfaces(address):
services = r.get(
'opsdb:interface_services:%s:%s' % (
interface['router'],
interface['interface name']))
if not services:
services = []
else:
services = json.loads(services.decode('utf=8'))
yield {
'interface': interface,
'services': services
}
@routes.route("/peer-info/<address>", methods=['GET', 'POST'])
@common.require_accepts_json
def peer_info(address):
# canonicalize the input address first ...
try:
obj = ipaddress.ip_address(address)
address = obj.exploded
except ValueError:
raise ClassifierProcessingError(
'unable to parse %r as an ip address' % address)
r = common.get_current_redis()
cache_key = 'classifier-cache:peer:%s' % address
result = r.get(cache_key)
if result:
result = result.decode('utf-8')
else:
result = {}
info = r.get('ix_public_peer:%s' % address)
if info:
info = info.decode('utf-8')
result['ix-public-peer-info'] = ix_peering_info(json.loads(info))
info = r.get('vpn_rr_peer:%s' % address)
if info:
info = info.decode('utf-8')
result['vpn-rr-peer-info'] = json.loads(info)
interfaces = list(find_interfaces_and_services(address))
if interfaces:
result['interfaces'] = interfaces
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
@routes.route("/infinera-lambda-info/"
"<source_equipment>/<interface>/<circuit_id>",
methods=['GET', 'POST'])
@common.require_accepts_json
def get_trap_metadata(source_equipment, interface, circuit_id):
interface = interface.replace('-T', '-')
cache_key = 'classifier-cache:infinera:%s:%s' % (
source_equipment, interface)
r = common.get_current_redis()
result = r.get(cache_key)
if result:
result = result.decode('utf-8')
else:
result = {}
services = r.get(
'opsdb:interface_services:%s:%s' % (source_equipment, interface))
if services:
result['services'] = json.loads(services.decode('utf=8'))
gl = r.get('opsdb:geant_lambdas:%s' % circuit_id.lower())
if gl:
t_gl = json.loads(gl.decode('utf=8'))
t_gl['status'] = t_gl['status'].lower()
result['geant-lambda'] = t_gl
if not result:
return Response(
response="no available info for {} {}".format(
source_equipment, interface),
status=404,
mimetype="text/html")
result = json.dumps(result)
# cache this data for the next call
r.set(cache_key, result.encode('utf-8'))
return Response(result, mimetype="application/json")
@routes.route('/coriant-info/<equipment_name>/<path:entity_string>',
methods=['GET', 'POST'])
@common.require_accepts_json
def get_coriant_info(equipment_name, entity_string):
r = common.get_current_redis()
cache_key = 'classifier-cache:coriant:%s:%s' % (
equipment_name, entity_string)
result = r.get(cache_key)
# this is just for development to save deleting the cache every time
# result = False
if result:
result = result.decode('utf-8')
else:
m = re.match(r'^(\d+\-\d+)\.(\d+)', entity_string)
if not m:
logger.error(
'invalid coriant entity string format: %r' % entity_string)
return Response(
response="no available info for '{}' '{}'".format(
equipment_name, entity_string),
status=404,
mimetype="text/html")
result = {
'equipment name': equipment_name,
'card id': m.group(1),
'port number': m.group(2)
}
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with db.connection(config['ops-db']) as cx:
path = opsdb.lookup_coriant_path(
cx, equipment_name, result['card id'], result['port number'])
if path:
result['path'] = path
top_level_services = get_top_level_services(path['id'], r)
if top_level_services:
result['related-services'] = top_level_services
else:
logger.error('no path found for {}:{}'.format(
equipment_name, entity_string))
# cache this data for the next call
result = json.dumps(result).encode('utf-8')
r.set(cache_key, result)
return Response(result, mimetype="application/json")