Skip to content
Snippets Groups Projects
Commit ecaeddcf authored by Robert Latta's avatar Robert Latta
Browse files

switched to using interface_services and removed redundant code

parent d6544fe7
No related branches found
No related tags found
No related merge requests found
......@@ -5,8 +5,7 @@ import logging
import re
from copy import copy
from distutils.util import strtobool
from functools import lru_cache
from typing import Optional, List, Iterator
from typing import Optional, List
from flask import Blueprint, Response, request
from redis import Redis
......@@ -41,7 +40,7 @@ def _remove_duplicates_from_list(all_: list) -> list:
"""
removes duplicates from the input list
the list items must be encodable as json
:param l:
:param all_:
:return: a new list with unique elements
"""
tmp_dict = dict(
......@@ -75,18 +74,6 @@ def _location_from_services(services, r: Redis):
yield build_locations(loc_a, loc_b)
@lru_cache(256, typed=False)
def _location_from_port_id(port_id: Optional[str], r: Redis) -> Optional[dict]:
if not port_id:
return None
port_info = r.get(f'ims:port_id_interface:{port_id}')
if port_info:
port_info = json.loads(port_info.decode('utf-8'))
equipment_name = port_info['equipment_name']
return _location_from_equipment(equipment_name, r)
return None
class ClassifierRequestError(Exception):
status_code = 500
......@@ -213,89 +200,26 @@ def get_related_services(source_equipment: str, interface: str, r) -> dict:
yield from get_top_level_services(s['id'], r)
def get_top_level_services_by_port_id(port_id: str, r: Redis) -> Iterator:
services = r.get(f'ims:port_id_services:{port_id}')
if services:
for s in json.loads(services.decode('utf-8')):
yield from get_top_level_services(s['id'], r)
def get_full_service_info(s: dict, r: Redis) -> dict:
port_a_id = s.get('port_a_id', None)
port_b_id = s.get('port_b_id', None)
service = s.copy()
service.pop('port_a_id', None)
service.pop('port_b_id', None)
loc_a = _location_from_port_id(port_a_id, r)
loc_b = _location_from_port_id(port_b_id, r)
if loc_a:
service['pop_name'] = loc_a['name']
service['pop_abbreviation'] = loc_a['abbreviation']
service['equipment'] = loc_a['equipment']
else:
service['pop_name'] = ''
service['pop_abbreviation'] = ''
service['equipment'] = ''
if loc_b:
service['other_end_pop_name'] = loc_b['name']
service['other_end_pop_abbreviation'] = loc_b['abbreviation']
service['other_end_equipment'] = loc_b['equipment']
else:
service['other_end_pop_name'] = ''
service['other_end_pop_abbreviation'] = ''
service['other_end_equipment'] = ''
return service
def get_interface_services_and_locs(ims_source_equipment, ims_interface, r):
result = {
'locations': [],
'services': [],
'related-services': []
'locations': []
}
services_dict = {}
rs_dict = {}
port_info = r.get(
f'ims:interface_port_ids:{ims_source_equipment}:{ims_interface}')
if port_info:
port_info = json.loads(port_info.decode('utf-8'))
port_id_services = r.get(
f'ims:port_id_services:{port_info["port_id"]}')
if port_id_services:
port_id_services = json.loads(port_id_services.decode('utf-8'))
for ps in port_id_services:
services_dict[ps['id']] = get_full_service_info(ps, r)
port_a_id = ps.get('port_a_id', None)
port_b_id = ps.get('port_b_id', None)
rs_dict.update(
{x['id']: x for x in get_top_level_services_by_port_id(
port_a_id, r
)})
rs_dict.update(
{x['id']: x for x in get_top_level_services_by_port_id(
port_b_id, r
)})
loc = build_locations(
_location_from_port_id(port_a_id, r),
_location_from_port_id(port_b_id, r)
)
if loc:
result['locations'].append(loc)
result['services'] = list(services_dict.values())
result['related-services'] = list(rs_dict.values())
for r in result['related-services']:
r.pop('id', None)
if not result.get('locations'):
raw_services = r.get(
f'ims:interface_services:{ims_source_equipment}:{ims_interface}')
if raw_services:
result['services'] = json.loads(raw_services.decode('utf-8'))
result['related-services'] = \
list(get_related_services(ims_source_equipment, ims_interface, r))
result['locations'] = _location_from_services(result['services'], r)
if not result['services']:
result.pop('services', None)
if not result['related-services']:
result.pop('related-services', None)
if not result.get('locations', None):
locations = build_locations(
_location_from_equipment(ims_source_equipment, r))
result['locations'] = [locations] if locations else []
if not result['services']:
result.pop('services', None)
if not result['related-services']:
result.pop('related-services', None)
result['locations'] = _remove_duplicates_from_list(result['locations'])
return result
......@@ -386,12 +310,9 @@ def get_juniper_link_info(source_equipment: str, interface: str) -> Response:
result = result.decode('utf-8')
else:
result = {
'locations': []
'interface': _link_interface_info(r, source_equipment, interface)
}
result['interface'] = _link_interface_info(
r, source_equipment, interface)
bundle_members = r.get(
f'netconf-interface-bundles:{source_equipment}:{interface}')
if bundle_members:
......@@ -400,11 +321,13 @@ def get_juniper_link_info(source_equipment: str, interface: str) -> Response:
else:
result['interface']['bundle_members'] = []
result.update(get_interface_services_and_locs(
ims_source_equipment,
ims_interface,
r
))
result.update(
get_interface_services_and_locs(
ims_source_equipment,
ims_interface,
r
)
)
result = json.dumps(result)
# cache this data for the next call
......
......@@ -33,18 +33,6 @@ def update_fibre_spans_ims():
return Response('OK')
@routes.route("update-interfaces-to-port-ids", methods=['GET', 'POST'])
def update_interfaces_to_port_id_ims():
worker.update_interfaces_to_port_ids.delay(use_current=True)
return Response('OK')
@routes.route("update-port-ids-to-services", methods=['GET', 'POST'])
def update_port_ids_to_services_ims():
worker.update_port_ids_to_services.delay(use_current=True)
return Response('OK')
@routes.route("update-circuit-hierarchy", methods=['GET', 'POST'])
def update_circuit_hierarchy_ims():
worker.update_circuit_hierarchy.delay(use_current=True)
......
......@@ -451,8 +451,6 @@ def internal_refresh_phase_2(self):
subtasks = [
update_circuit_hierarchy.apply_async(),
update_interfaces_to_services.apply_asynch(),
update_interfaces_to_port_ids.apply_async(),
update_port_ids_to_services.apply_async(),
import_unmanaged_interfaces.apply_async()
]
......@@ -501,39 +499,6 @@ def update_fibre_spans(self, use_current=False):
rp.execute()
@app.task(
base=InventoryTask, bind=True, name='update_interfaces_to_port_ids')
@log_task_entry_and_exit
def update_interfaces_to_port_ids(self, use_current=False):
if use_current:
r = get_current_redis(InventoryTask.config)
else:
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
# scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('ims:interface_port_ids:*', count=2000):
rp.delete(key)
for key in r.scan_iter('ims:port_id_interface:*', count=2000):
rp.delete(key)
rp.execute()
c = InventoryTask.config["ims"]
ds = IMS(c['api'], c['username'], c['password'])
rp = r.pipeline()
for pd in ims_data.get_port_details(ds):
d = json.dumps(pd)
rp.set(
f'ims:interface_port_ids:{pd["equipment_name"]}'
f':{pd["interface_name"]}',
d)
rp.set(
f'ims:port_id_interface:{pd["port_id"]}',
d)
rp.execute()
@app.task(
base=InventoryTask, bind=True, name='update_interfaces_to_services')
@log_task_entry_and_exit
......@@ -601,37 +566,6 @@ def update_interfaces_to_services(self, use_current=False):
rp.execute()
@app.task(
base=InventoryTask, bind=True, name='update_port_ids_to_services')
@log_task_entry_and_exit
def update_port_ids_to_services(self, use_current=False):
interface_services = defaultdict(list)
c = InventoryTask.config["ims"]
ds = IMS(c['api'], c['username'], c['password'])
for service in ims_data.get_port_id_services(ds):
interface_services[service["port_a_id"]].append(service)
if use_current:
r = get_current_redis(InventoryTask.config)
else:
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
# scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('ims:port_id_services:*', count=2000):
rp.delete(key)
rp.execute()
rp = r.pipeline()
for port_id, services in interface_services.items():
rp.set(
f'ims:port_id_services:{port_id}',
json.dumps(services))
rp.execute()
@app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy')
@log_task_entry_and_exit
def update_circuit_hierarchy(self, use_current=False):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment