diff --git a/inventory_provider/routes/classifier.py b/inventory_provider/routes/classifier.py index f66284e7e92e40a82199fea768a590a63282962e..bc5ffed3e78cb15dbcef5e7cb29af95e5793c569 100644 --- a/inventory_provider/routes/classifier.py +++ b/inventory_provider/routes/classifier.py @@ -17,12 +17,6 @@ These endpoints are intended for use by Dashboard V3. .. autofunction:: inventory_provider.routes.classifier.get_juniper_link_info -/classifier/infinera-lambda-info --------------------------------- - -.. autofunction:: inventory_provider.routes.classifier.get_infinera_lambda_info - - /classifier/infinera-fiberlink-info ------------------------------------ diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index b33476fb1d031b4133585eb204c2ca587b466b59..44e7e8ec9fb7eee700b6de2a4b1ed086bc5a7ee9 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -108,13 +108,12 @@ def after_request(resp): return common.after_request(resp) -@routes.route("/update", methods=['GET', 'POST']) -@common.require_accepts_json +@routes.route("update", methods=['GET', 'POST']) def update(): """ Handler for `/jobs/update`. - This resource updates the inventory network data for juniper devices. + This resource updates the inventory network data. The function completes asynchronously and a list of outstanding task id's is returned so the caller can use `/jobs/check-task-status` to determine when all jobs @@ -135,17 +134,15 @@ def update(): response='an update is already in progress', status=503, mimetype="text/html") - - phase2_task_id = worker.launch_refresh_cache_all(config) - - r.set('classifier-cache:update-task-id', phase2_task_id.encode('utf-8')) - return jsonify({'task id': phase2_task_id}) + update_task_id = worker.update_entry_point.delay().get() + r.set('classifier-cache:update-task-id', update_task_id.encode('utf-8')) + return jsonify({'task id': update_task_id}) @routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST']) @common.require_accepts_json def reload_router_config(equipment_name): - result = worker.reload_router_config.delay(equipment_name) + result = worker.reload_router_config_chorded.delay(equipment_name) return jsonify({'task id': result.id}) diff --git a/inventory_provider/routes/testing.py b/inventory_provider/routes/testing.py index 8240efe5e92438674a881339b140f398f25b5695..f81982283885785da1dfaad497ba8e8ed1899f1c 100644 --- a/inventory_provider/routes/testing.py +++ b/inventory_provider/routes/testing.py @@ -18,12 +18,6 @@ routes = Blueprint("inventory-data-testing-support-routes", __name__) logger = logging.getLogger(__name__) -@routes.route("chord-update", methods=['GET', 'POST']) -def chord_update(): - r = worker.update_entry_point.delay().get() - return jsonify(r) - - @routes.route("flushdb", methods=['GET', 'POST']) def flushdb(): common.get_current_redis().flushdb() diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index fa583a3ec3e68f595ced6035f5a04230b0a5d24d..7e415fd5153ecb6309822bb90043af5ecd050aec 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -4,19 +4,15 @@ import json import logging import os import re -import threading -import time from typing import List -from redis.exceptions import RedisError -from kombu.exceptions import KombuError - from celery import Task, states, chord from celery.result import AsyncResult from collections import defaultdict + +from kombu.exceptions import KombuError from lxml import etree -import jsonschema from inventory_provider.db import ims_data from inventory_provider.db.ims import IMS @@ -36,6 +32,7 @@ from inventory_provider import config from inventory_provider import environment from inventory_provider import snmp from inventory_provider import juniper +from redis import RedisError FINALIZER_POLLING_FREQUENCY_S = 2.5 FINALIZER_TIMEOUT_S = 300 @@ -100,32 +97,6 @@ class InventoryTask(Task): self.send_event('task-error', message=message) -@app.task(base=InventoryTask, bind=True, name='snmp_refresh_peerings') -@log_task_entry_and_exit -def snmp_refresh_peerings(self, hostname, community, logical_systems): - try: - peerings = list( - snmp.get_peer_state_info(hostname, community, logical_systems)) - except ConnectionError: - msg = f'error loading snmp peering data from {hostname}' - logger.exception(msg) - self.log_warning(msg) - r = get_current_redis(InventoryTask.config) - peerings = r.get(f'snmp-peerings:hosts:{hostname}') - if peerings is None: - raise InventoryTaskError( - f'snmp error with {peerings}' - f' and no cached peering data found') - # unnecessary json encode/decode here ... could be optimized - peerings = json.loads(peerings.decode('utf-8')) - self.log_warning(f'using cached snmp peering data for {hostname}') - - r = get_next_redis(InventoryTask.config) - r.set(f'snmp-peerings:hosts:{hostname}', json.dumps(peerings)) - - self.log_info(f'snmp peering info loaded from {hostname}') - - @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @log_task_entry_and_exit def snmp_refresh_interfaces(self, hostname, community, logical_systems): @@ -164,45 +135,6 @@ def snmp_refresh_interfaces(self, hostname, community, logical_systems): self.log_info(f'snmp interface info loaded from {hostname}') -@app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') -@log_task_entry_and_exit -def netconf_refresh_config(self, hostname, lab=False): - """ - load netconf and save - - save under 'lab:...' if lab is true - - :param self: - :param hostname: - :param lab: - :return: - """ - - redis_key = f'netconf:{hostname}' - if lab: - redis_key = f'lab:{redis_key}' - - try: - netconf_doc = juniper.load_config( - hostname, InventoryTask.config["ssh"]) - netconf_str = etree.tostring(netconf_doc, encoding='unicode') - except (ConnectionError, juniper.NetconfHandlingError): - msg = f'error loading netconf data from {hostname}' - logger.exception(msg) - self.log_warning(msg) - r = get_current_redis(InventoryTask.config) - netconf_str = r.get(redis_key) - if not netconf_str: - raise InventoryTaskError( - f'netconf error with {hostname}' - f' and no cached netconf data found') - self.log_warning(f'using cached netconf data for {hostname}') - - r = get_next_redis(InventoryTask.config) - r.set(redis_key, netconf_str) - self.log_info(f'netconf info loaded from {hostname}') - - def _unmanaged_interfaces(): def _convert(d): @@ -220,75 +152,6 @@ def _unmanaged_interfaces(): InventoryTask.config.get('unmanaged-interfaces', [])) -@app.task(base=InventoryTask, bind=True, - name='update_neteng_managed_device_list') -@log_task_entry_and_exit -def update_neteng_managed_device_list(self): - self.log_info('querying netdash for managed routers') - - routers = list(juniper.load_routers_from_netdash( - InventoryTask.config['managed-routers'])) - - self.log_info(f'found {len(routers)} routers, saving details') - - r = get_next_redis(InventoryTask.config) - r.set('netdash', json.dumps(routers).encode('utf-8')) - self.log_info(f'saved {len(routers)} managed routers') - - -def load_netconf_data(hostname, lab=False): - """ - this method should only be called from a task - - :param hostname: - :param lab: True to look up under 'lab:...' - :return: - """ - redis_key = f'netconf:{hostname}' - if lab: - redis_key = f'lab:{redis_key}' - - r = get_next_redis(InventoryTask.config) - netconf = r.get(redis_key) - if not netconf: - raise InventoryTaskError('no netconf data found for %r' % hostname) - return etree.fromstring(netconf.decode('utf-8')) - - -def clear_cached_classifier_responses(hostname=None): - if hostname: - logger.debug( - 'removing cached classifier responses for %r' % hostname) - else: - logger.debug('removing all cached classifier responses') - - r = get_next_redis(InventoryTask.config) - - def _hostname_keys(): - for k in r.keys('classifier-cache:juniper:%s:*' % hostname): - yield k - - # TODO: very inefficient ... but logically simplest at this point - for k in r.keys('classifier-cache:peer:*'): - value = r.get(k.decode('utf-8')) - if not value: - # deleted in another thread - continue - value = json.loads(value.decode('utf-8')) - interfaces = value.get('interfaces', []) - if hostname in [i['interface']['router'] for i in interfaces]: - yield k - - def _all_keys(): - return r.keys('classifier-cache:*') - - keys_to_delete = _hostname_keys() if hostname else _all_keys() - rp = r.pipeline() - for k in keys_to_delete: - rp.delete(k) - rp.execute() - - @log_task_entry_and_exit def refresh_juniper_bgp_peers(hostname, netconf): host_peerings = list(juniper.all_bgp_peers(netconf)) @@ -357,93 +220,6 @@ def refresh_juniper_interface_list(hostname, netconf, lab=False): rp.execute() -@app.task(base=InventoryTask, bind=True, name='reload_lab_router_config') -@log_task_entry_and_exit -def reload_lab_router_config(self, hostname): - self.log_info(f'loading netconf data for lab {hostname}') - - # load new netconf data, in this thread - netconf_refresh_config.apply(args=[hostname, True]) - - netconf_doc = load_netconf_data(hostname, lab=True) - - refresh_juniper_interface_list(hostname, netconf_doc, lab=True) - - # load snmp indexes - community = juniper.snmp_community_string(netconf_doc) - if not community: - raise InventoryTaskError( - f'error extracting community string for {hostname}') - else: - self.log_info(f'refreshing snmp interface indexes for {hostname}') - logical_systems = juniper.logical_systems(netconf_doc) - - # load snmp data, in this thread - snmp_refresh_interfaces.apply( - args=[hostname, community, logical_systems]) - - self.log_info(f'updated configuration for lab {hostname}') - - -@app.task(base=InventoryTask, bind=True, name='reload_router_config') -@log_task_entry_and_exit -def reload_router_config(self, hostname): - self.log_info(f'loading netconf data for {hostname}') - - # get the timestamp for the current netconf data - current_netconf_timestamp = None - try: - netconf_doc = load_netconf_data(hostname) - current_netconf_timestamp \ - = juniper.netconf_changed_timestamp(netconf_doc) - logger.debug( - 'current netconf timestamp: %r' % current_netconf_timestamp) - except InventoryTaskError: - # NOTE: should always reach here, - # since we always erase everything before starting - pass # ok at this point if not found - - # load new netconf data, in this thread - netconf_refresh_config.apply(args=[hostname, False]) - - netconf_doc = load_netconf_data(hostname) - - # return if new timestamp is the same as the original timestamp - new_netconf_timestamp = juniper.netconf_changed_timestamp(netconf_doc) - assert new_netconf_timestamp, \ - 'no timestamp available for new netconf data' - if new_netconf_timestamp == current_netconf_timestamp: - msg = f'no timestamp change for {hostname} netconf data' - logger.debug(msg) - self.log_info(msg) - return - - # clear cached classifier responses for this router, and - # refresh peering data - self.log_info(f'refreshing peers & clearing cache for {hostname}') - refresh_juniper_bgp_peers(hostname, netconf_doc) - refresh_juniper_interface_list(hostname, netconf_doc) - - # load snmp indexes - community = juniper.snmp_community_string(netconf_doc) - if not community: - raise InventoryTaskError( - f'error extracting community string for {hostname}') - else: - self.log_info(f'refreshing snmp interface indexes for {hostname}') - logical_systems = juniper.logical_systems(netconf_doc) - - # load snmp data, in this thread - snmp_refresh_interfaces.apply( - args=[hostname, community, logical_systems]) - snmp_refresh_peerings.apply( - args=[hostname, community, logical_systems]) - - clear_cached_classifier_responses(None) - self.log_info(f'updated configuration for {hostname}') - - -# updated with transaction def _erase_next_db(config): """ flush next db, but first save latch and then restore afterwards @@ -480,77 +256,6 @@ def _erase_next_db(config): update_timestamp=saved_latch.get('update-started', 0)) -@log_task_entry_and_exit -def launch_refresh_cache_all(config): - """ - utility function intended to be called outside of the worker process - :param config: config structure as defined in config.py - :return: - """ - - try: - _erase_next_db(config) - update_latch_status(config, pending=True) - - monitor.clear_joblog(get_current_redis(config)) - - # first batch of subtasks: refresh cached IMS location data - subtasks = [ - update_neteng_managed_device_list.apply_async(), - update_equipment_locations.apply_async(), - update_lg_routers.apply_async(), - ] - [x.get() for x in subtasks] - - # now launch the task whose only purpose is to - # act as a convenient parent for all of the remaining tasks - t = internal_refresh_phase_2.apply_async() - return t.id - - except (RedisError, KombuError): - update_latch_status(config, pending=False, failure=True) - logger.exception('error launching refresh subtasks') - raise - - -@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2') -@log_task_entry_and_exit -def internal_refresh_phase_2(self): - # second batch of subtasks: - # ims circuit information - try: - - subtasks = [ - update_circuit_hierarchy_and_port_id_services.apply_async() - ] - - r = get_next_redis(InventoryTask.config) - routers = r.get('netdash') - assert routers - netdash_equipment = json.loads(routers.decode('utf-8')) - for hostname in netdash_equipment: - logger.debug(f'queueing router refresh jobs for {hostname}') - subtasks.append(reload_router_config.apply_async(args=[hostname])) - - lab_routers = InventoryTask.config.get('lab-routers', []) - for hostname in lab_routers: - logger.debug('queueing router refresh jobs for lab %r' % hostname) - subtasks.append( - reload_lab_router_config.apply_async(args=[hostname])) - - pending_task_ids = [x.id for x in subtasks] - - refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) - - except KombuError: - # TODO: possible race condition here - # e.g. if one of these tasks takes a long time and another - # update is started, we could end up with strange data - update_latch_status(config, pending=False, failure=True) - logger.exception('error launching refresh phase 2 subtasks') - raise - - def populate_poller_cache(interface_services, r): host_services = defaultdict(dict) for v in interface_services.values(): @@ -575,420 +280,6 @@ def populate_poller_cache(interface_services, r): rp.execute() -@app.task( - base=InventoryTask, - bind=True, - name='update_circuit_hierarchy_and_port_id_services') -@log_task_entry_and_exit -def update_circuit_hierarchy_and_port_id_services(self, use_current=False): - c = InventoryTask.config["ims"] - ds1 = IMS(c['api'], c['username'], c['password']) - ds2 = IMS(c['api'], c['username'], c['password']) - ds3 = IMS(c['api'], c['username'], c['password']) - - locations = {k: v for k, v in ims_data.get_node_locations(ds1)} - tls_names = list(ims_data.get_service_types(ds1)) - customer_contacts = \ - {k: v for k, v in ims_data.get_customer_service_emails(ds1)} - circuit_ids_to_monitor = \ - list(ims_data.get_monitored_circuit_ids(ds1)) - additional_circuit_customer_ids = \ - ims_data.get_circuit_related_customer_ids(ds1) - - hierarchy = None - port_id_details = defaultdict(list) - port_id_services = defaultdict(list) - interface_services = defaultdict(list) - services_by_type = {} - - def _convert_to_bits(value, unit): - unit = unit.lower() - conversions = { - 'm': 1 << 20, - 'mb': 1 << 20, - 'g': 1 << 30, - 'gbe': 1 << 30, - } - return int(value) * conversions[unit] - - def _get_speed(circuit_id): - c = hierarchy[circuit_id] - if c['status'] != 'operational': - return 0 - pattern = re.compile(r'^(\d+)([a-zA-z]+)$') - m = pattern.match(c['speed']) - if m: - try: - return _convert_to_bits(m[1], m[2]) - except KeyError as e: - logger.debug(f'Could not find key: {e} ' - f'for circuit: {circuit_id}') - return 0 - else: - if c['circuit-type'] == 'service' \ - or c['product'].lower() == 'ethernet': - return sum( - (_get_speed(x) for x in c['carrier-circuits']) - ) - else: - return 0 - - def _get_circuit_contacts(c): - customer_ids = {c['customerid']} - customer_ids.update(additional_circuit_customer_ids.get(c['id'], [])) - return set().union( - *[customer_contacts.get(cid, []) for cid in customer_ids]) - - def _populate_hierarchy(): - nonlocal hierarchy - hierarchy = {} - for d in ims_data.get_circuit_hierarchy(ds1): - hierarchy[d['id']] = d - d['contacts'] = sorted(list(_get_circuit_contacts(d))) - logger.debug("hierarchy complete") - - def _populate_port_id_details(): - nonlocal port_id_details - for x in ims_data.get_port_details(ds2): - pd = port_id_details[x['port_id']] - pd.append(x) - logger.debug("Port details complete") - - def _populate_circuit_info(): - for x in ims_data.get_port_id_services(ds3): - port_id_services[x['port_a_id']].append(x) - logger.debug("port circuits complete") - - hierarchy_thread = threading.Thread(target=_populate_hierarchy) - hierarchy_thread.start() - - port_id_details_thread = threading.Thread(target=_populate_port_id_details) - port_id_details_thread.start() - - circuit_info_thread = threading.Thread(target=_populate_circuit_info) - circuit_info_thread.start() - - hierarchy_thread.join() - circuit_info_thread.join() - port_id_details_thread.join() - - def _get_fibre_routes(c_id): - _circ = hierarchy.get(c_id, None) - if _circ is None: - return - if _circ['speed'].lower() == 'fibre_route': - yield _circ['id'] - else: - for cc in _circ['carrier-circuits']: - yield from _get_fibre_routes(cc) - - # whilst this is a service type the top level for reporting - # are the BGP services on top of it - tls_names.remove('IP PEERING - NON R&E (PUBLIC)') - - def _get_related_services(circuit_id: str) -> List[dict]: - rs = {} - c = hierarchy.get(circuit_id, None) - if c: - - if c['circuit-type'] == 'service': - rs[c['id']] = { - 'id': c['id'], - 'name': c['name'], - 'circuit_type': c['circuit-type'], - 'service_type': c['product'], - 'project': c['project'], - 'contacts': c['contacts'] - } - if c['id'] in circuit_ids_to_monitor: - rs[c['id']]['status'] = c['status'] - else: - rs[c['id']]['status'] = 'non-monitored' - - if c['sub-circuits']: - for sub in c['sub-circuits']: - temp_parents = \ - _get_related_services(sub) - rs.update({t['id']: t for t in temp_parents}) - return list(rs.values()) - - def _format_service(s): - - if s['circuit_type'] == 'service' \ - and s['id'] not in circuit_ids_to_monitor: - s['status'] = 'non-monitored' - pd_a = port_id_details[s['port_a_id']][0] - location_a = locations.get(pd_a['equipment_name'], None) - if location_a: - loc_a = location_a['pop'] - else: - loc_a = locations['UNKNOWN_LOC']['pop'] - logger.warning( - f'Unable to find location for {pd_a["equipment_name"]} - ' - f'Service ID {s["id"]}') - s['pop_name'] = loc_a['name'] - s['pop_abbreviation'] = loc_a['abbreviation'] - s['equipment'] = pd_a['equipment_name'] - s['card_id'] = '' # this is redundant I believe - s['port'] = pd_a['interface_name'] - s['logical_unit'] = '' # this is redundant I believe - if 'port_b_id' in s: - pd_b = port_id_details[s['port_b_id']][0] - location_b = locations.get(pd_b['equipment_name'], None) - if location_b: - loc_b = location_b['pop'] - else: - loc_b = locations['UNKNOWN_LOC']['pop'] - logger.warning( - f'Unable to find location for {pd_b["equipment_name"]} - ' - f'Service ID {s["id"]}') - - s['other_end_pop_name'] = loc_b['name'] - s['other_end_pop_abbreviation'] = loc_b['abbreviation'] - s['other_end_equipment'] = pd_b['equipment_name'] - s['other_end_port'] = pd_b['interface_name'] - else: - s['other_end_pop_name'] = '' - s['other_end_pop_abbreviation'] = '' - s['other_end_equipment'] = '' - s['other_end_port'] = '' - - s.pop('port_a_id', None) - s.pop('port_b_id', None) - - # using a dict to ensure no duplicates - node_pair_services = defaultdict(dict) - - for key, value in port_id_details.items(): - for details in value: - k = f"{details['equipment_name']}:" \ - f"{details['interface_name']}" - circuits = port_id_services.get(details['port_id'], []) - - for circ in circuits: - contacts = _get_circuit_contacts(circ) - circ['fibre-routes'] = [] - for x in set(_get_fibre_routes(circ['id'])): - c = { - 'id': hierarchy[x]['id'], - 'name': hierarchy[x]['name'], - 'status': hierarchy[x]['status'] - } - circ['fibre-routes'].append(c) - - circ['related-services'] = \ - _get_related_services(circ['id']) - - for tlc in circ['related-services']: - contacts.update(tlc.pop('contacts')) - circ['contacts'] = sorted(list(contacts)) - - circ['calculated-speed'] = _get_speed(circ['id']) - _format_service(circ) - - type_services = services_by_type.setdefault( - ims_sorted_service_type_key(circ['service_type']), dict()) - type_services[circ['id']] = circ - if circ['other_end_equipment']: - node_pair_services[ - f"{circ['equipment']}/{circ['other_end_equipment']}" - ][circ['id']] = circ - - interface_services[k].extend(circuits) - - if use_current: - r = get_current_redis(InventoryTask.config) - else: - r = get_next_redis(InventoryTask.config) - rp = r.pipeline() - for k in r.scan_iter('ims:circuit_hierarchy:*', count=1000): - rp.delete(k) - rp.execute() - rp = r.pipeline() - for k in r.scan_iter('ims:interface_services:*', count=1000): - rp.delete(k) - rp.execute() - rp = r.pipeline() - for k in r.scan_iter('ims:node_pair_services:*', count=1000): - rp.delete(k) - rp.execute() - rp = r.pipeline() - for k in r.scan_iter('ims:access_services:*', count=1000): - rp.delete(k) - for k in r.scan_iter('ims:gws_indirect:*', count=1000): - rp.delete(k) - rp.execute() - rp = r.pipeline() - for circ in hierarchy.values(): - rp.set(f'ims:circuit_hierarchy:{circ["id"]}', json.dumps([circ])) - rp.execute() - rp = r.pipeline() - for k, v in interface_services.items(): - rp.set( - f'ims:interface_services:{k}', - json.dumps(v)) - rp.execute() - rp = r.pipeline() - for k, v in node_pair_services.items(): - rp.set( - f'ims:node_pair_services:{k}', - json.dumps(list(v.values()))) - rp.execute() - rp = r.pipeline() - - populate_poller_cache(interface_services, r) - - for service_type, services in services_by_type.items(): - for v in services.values(): - rp.set( - f'ims:services:{service_type}:{v["name"]}', - json.dumps({ - 'id': v['id'], - 'name': v['name'], - 'project': v['project'], - 'here': { - 'pop': { - 'name': v['pop_name'], - 'abbreviation': v['pop_abbreviation'] - }, - 'equipment': v['equipment'], - 'port': v['port'], - }, - 'there': { - 'pop': { - 'name': v['other_end_pop_name'], - 'abbreviation': v['other_end_pop_abbreviation'] - }, - 'equipment': v['other_end_equipment'], - 'port': v['other_end_port'], - }, - 'speed_value': v['calculated-speed'], - 'speed_unit': 'n/a', - 'status': v['status'], - 'type': v['service_type'] - })) - - rp.execute() - - -@app.task(base=InventoryTask, bind=True, name='update_equipment_locations') -@log_task_entry_and_exit -def update_equipment_locations(self, use_current=False): - if use_current: - r = get_current_redis(InventoryTask.config) - else: - r = get_next_redis(InventoryTask.config) - rp = r.pipeline() - for k in r.scan_iter('ims:location:*', count=1000): - rp.delete(k) - rp.execute() - - c = InventoryTask.config["ims"] - ds = IMS(c['api'], c['username'], c['password']) - - rp = r.pipeline() - hostnames_found = set() - for h, d in ims_data.get_node_locations(ds): - # put into a list to match non-IMS version - rp.set(f'ims:location:{h}', json.dumps([d])) - if h in hostnames_found: - logger.debug(f'Multiple entries for {h}') - hostnames_found.add(h) - rp.execute() - - -@app.task(base=InventoryTask, bind=True, name='update_lg_routers') -@log_task_entry_and_exit -def update_lg_routers(self, use_current=False): - if use_current: - r = get_current_redis(InventoryTask.config) - for k in r.scan_iter('ims:lg:*'): - r.delete(k) - else: - r = get_next_redis(InventoryTask.config) - - for k in r.scan_iter('ims:lg:*'): - r.delete(k) - c = InventoryTask.config["ims"] - ds = IMS(c['api'], c['username'], c['password']) - - for router in ims_data.lookup_lg_routers(ds): - r.set(f'ims:lg:{router["equipment name"]}', json.dumps(router)) - - -def _wait_for_tasks(task_ids, update_callback=lambda s: None): - - all_successful = True - - start_time = time.time() - while task_ids and time.time() - start_time < FINALIZER_TIMEOUT_S: - update_callback('waiting for tasks to complete: %r' % task_ids) - time.sleep(FINALIZER_POLLING_FREQUENCY_S) - - def _task_and_children_result(id): - tasks = list(check_task_status(id)) - return { - 'error': any([t['ready'] and not t['success'] for t in tasks]), - 'ready': all([t['ready'] for t in tasks]) - } - - results = dict([ - (id, _task_and_children_result(id)) - for id in task_ids]) - - if any([t['error'] for t in results.values()]): - all_successful = False - - task_ids = [ - id for id, status in results.items() - if not status['ready']] - - if task_ids: - raise InventoryTaskError( - 'timeout waiting for pending tasks to complete') - if not all_successful: - raise InventoryTaskError( - 'some tasks finished with an error') - - update_callback('pending tasks completed in {} seconds'.format( - time.time() - start_time)) - - -@app.task(base=InventoryTask, bind=True, name='refresh_finalizer') -@log_task_entry_and_exit -def refresh_finalizer(self, pending_task_ids_json): - - # TODO: if more types of errors appear, use a finally block - - input_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", - "type": "array", - "items": {"type": "string"} - } - - try: - task_ids = json.loads(pending_task_ids_json) - logger.debug('task_ids: %r' % task_ids) - jsonschema.validate(task_ids, input_schema) - - _wait_for_tasks(task_ids, update_callback=self.log_info) - _build_subnet_db(update_callback=self.log_info) - _build_snmp_peering_db(update_callback=self.log_info) - _build_juniper_peering_db(update_callback=self.log_info) - populate_poller_interfaces_cache(warning_callback=self.log_warning) - - except (jsonschema.ValidationError, - json.JSONDecodeError, - InventoryTaskError, - RedisError, - KombuError) as e: - update_latch_status(InventoryTask.config, failure=True) - raise e - - latch_db(InventoryTask.config) - self.log_info('latched current/next dbs') - - def _build_subnet_db(update_callback=lambda s: None): r = get_next_redis(InventoryTask.config) @@ -1189,47 +480,52 @@ def check_task_status(task_id, parent=None, forget=False): yield result -# =================================== chorded - currently only here for testing - -# new @app.task(base=InventoryTask, bind=True, name='update_entry_point') @log_task_entry_and_exit def update_entry_point(self): - routers = retrieve_and_persist_neteng_managed_device_list( - info_callback=self.log_info, - warning_callback=self.log_warning - ) - lab_routers = InventoryTask.config.get('lab-routers', []) - - _erase_next_db(InventoryTask.config) - update_latch_status(InventoryTask.config, pending=True) - - tasks = chord( - ( - ims_task.s().on_error(task_error_handler.s()), - chord( - (reload_router_config_chorded.s(r) for r in routers), - empty_task.si('router tasks complete') + + try: + _erase_next_db(InventoryTask.config) + update_latch_status(InventoryTask.config, pending=True) + monitor.clear_joblog(get_current_redis(config)) + self.log_info("Starting update") + + routers = retrieve_and_persist_neteng_managed_device_list( + info_callback=self.log_info, + warning_callback=self.log_warning + ) + lab_routers = InventoryTask.config.get('lab-routers', []) + + chord( + ( + ims_task.s().on_error(task_error_handler.s()), + chord( + (reload_router_config_chorded.s(r) for r in routers), + empty_task.si('router tasks complete') + ), + chord( + (reload_lab_router_config_chorded.s(r) + for r in lab_routers), + empty_task.si('lab router tasks complete') + ) ), - chord( - (reload_lab_router_config_chorded.s(r) for r in lab_routers), - empty_task.si('lab router tasks complete') - ) - ), - final_task.si().on_error(task_error_handler.s()) - )() - return tasks + final_task.si().on_error(task_error_handler.s()) + )() + + return self.request.id + except (RedisError, KombuError): + update_latch_status(config, pending=False, failure=True) + logger.exception('error launching refresh subtasks') + raise -# new @app.task def task_error_handler(request, exc, traceback): update_latch_status(InventoryTask.config, pending=False, failure=True) logger.warning('Task {0!r} raised error: {1!r}'.format(request.id, exc)) -# new @app.task(base=InventoryTask, bind=True, name='empty_task') def empty_task(self, message): logger.warning(f'message from empty task: {message}') @@ -1273,12 +569,11 @@ def retrieve_and_persist_neteng_managed_device_list( return netdash_equipment -# updated @app.task(base=InventoryTask, bind=True, name='reload_lab_router_config') @log_task_entry_and_exit def reload_lab_router_config_chorded(self, hostname): try: - self.log_info(f'loading netconf data for lab {hostname} RL') + self.log_info(f'loading netconf data for lab {hostname}') # load new netconf data, in this thread netconf_str = retrieve_and_persist_netconf_config( @@ -1305,12 +600,11 @@ def reload_lab_router_config_chorded(self, hostname): update_latch_status(InventoryTask.config, pending=True, failure=True) -# updated @app.task(base=InventoryTask, bind=True, name='reload_router_config') @log_task_entry_and_exit def reload_router_config_chorded(self, hostname): try: - self.log_info(f'loading netconf data for {hostname} RL') + self.log_info(f'loading netconf data for {hostname}') netconf_str = retrieve_and_persist_netconf_config( hostname, update_callback=self.log_warning) @@ -1342,7 +636,6 @@ def reload_router_config_chorded(self, hostname): update_latch_status(InventoryTask.config, pending=True, failure=True) -# new def retrieve_and_persist_netconf_config( hostname, lab=False, update_callback=lambda s: None): redis_key = f'netconf:{hostname}' @@ -1374,7 +667,6 @@ def retrieve_and_persist_netconf_config( return netconf_str -# updated as is no longer a task @log_task_entry_and_exit def snmp_refresh_interfaces_chorded( hostname, community, logical_systems, update_callback=lambda s: None): @@ -1413,7 +705,6 @@ def snmp_refresh_interfaces_chorded( update_callback(f'snmp interface info loaded from {hostname}') -# updated as is no longer a task @log_task_entry_and_exit def snmp_refresh_peerings_chorded( hostname, community, logical_systems, update_callback=lambda S: None): @@ -1440,11 +731,9 @@ def snmp_refresh_peerings_chorded( update_callback(f'snmp peering info loaded from {hostname}') -# new @app.task(base=InventoryTask, bind=True, name='ims_task') @log_task_entry_and_exit def ims_task(self, use_current=False): - try: extracted_data = extract_ims_data() transformed_data = transform_ims_data(extracted_data) @@ -1456,7 +745,6 @@ def ims_task(self, use_current=False): update_latch_status(InventoryTask.config, pending=True, failure=True) -# new def extract_ims_data(): c = InventoryTask.config["ims"] @@ -1561,7 +849,6 @@ def extract_ims_data(): } -# new def transform_ims_data(data): locations = data['locations'] customer_contacts = data['customer_contacts'] @@ -1742,7 +1029,6 @@ def transform_ims_data(data): } -# new def persist_ims_data(data, use_current=False): hierarchy = data['hierarchy'] locations = data['locations'] @@ -1833,7 +1119,6 @@ def persist_ims_data(data, use_current=False): rp.execute() -# new @app.task(base=InventoryTask, bind=True, name='final_task') @log_task_entry_and_exit def final_task(self): diff --git a/test/per_router/test_celery_worker.py b/test/per_router/test_celery_worker.py index e071f5f8981466f87b2281144f0e3ab2e0b45e95..9723a00f4e0cb87862b10ad8058da8557c8b71ea 100644 --- a/test/per_router/test_celery_worker.py +++ b/test/per_router/test_celery_worker.py @@ -19,7 +19,7 @@ def backend_db(): def test_netconf_refresh_config(mocked_worker_module, router): del backend_db()['netconf:' + router] - worker.netconf_refresh_config(router) + worker.reload_router_config_chorded(router) assert backend_db()['netconf:' + router] @@ -51,7 +51,7 @@ def test_snmp_refresh_peerings(mocked_worker_module, router): for k in list(_ifc_keys()): del backend_db()[k] - worker.snmp_refresh_peerings(router, 'fake-community', []) + worker.snmp_refresh_peerings_chorded(router, 'fake-community', []) assert list(_ifc_keys()) @@ -80,15 +80,15 @@ def test_reload_router_config(mocked_worker_module, router, mocker): key = 'netconf:' + args[0] backend_db()[key] = saved_data[key] mocker.patch( - 'inventory_provider.tasks.worker.netconf_refresh_config.apply', + 'inventory_provider.tasks.worker.reload_router_config_chorded.apply', _mocked_netconf_refresh_config_apply) - def _mocked_snmp_refresh_peerings_apply(args): + def _mocked_reload_router_config_chorded_apply(args): assert len(args) == 3 backend_db().update(saved_peerings) mocker.patch( - 'inventory_provider.tasks.worker.snmp_refresh_peerings.apply', - _mocked_snmp_refresh_peerings_apply) + 'inventory_provider.tasks.worker.reload_router_config_chorded.apply', + _mocked_reload_router_config_chorded_apply) def _mocked_snmp_refresh_interfaces_apply(args): assert len(args) == 3 @@ -105,6 +105,6 @@ def test_reload_router_config(mocked_worker_module, router, mocker): 'inventory_provider.tasks.worker.InventoryTask.update_state', _mocked_update_status) - worker.reload_router_config(router) + worker.reload_router_config_chorded(router) assert 'netconf:' + router in backend_db() assert 'snmp-interfaces:' + router in backend_db() diff --git a/test/per_router/test_classifier_data.py b/test/per_router/test_classifier_data.py deleted file mode 100644 index 37774b4469d86f8168de0ad7b08d8dfeb4d91a30..0000000000000000000000000000000000000000 --- a/test/per_router/test_classifier_data.py +++ /dev/null @@ -1,24 +0,0 @@ -from inventory_provider.tasks import worker -from inventory_provider.tasks.common import _get_redis - - -def backend_db(): - return _get_redis({ - 'redis': { - 'hostname': None, - 'port': None - }, - 'redis-databases': [0, 7] - }).db - - -def test_clear_classifier_cache( - router, - mocked_redis, - data_config, - classifier_cache_test_entries): - worker.InventoryTask.config = data_config - backend_db().update(classifier_cache_test_entries) - worker.clear_cached_classifier_responses(router) - for k in backend_db(): - assert not k.startswith('classifier-cache:%s:' % router) diff --git a/test/test_job_routes.py b/test/test_job_routes.py index f3a4df17d8541322bda4af2634d91ab59e90bb23..c280f57f882959df623e8f50e8fbd9f0dab71fa3 100644 --- a/test/test_job_routes.py +++ b/test/test_job_routes.py @@ -24,9 +24,9 @@ def backend_db(): def test_job_update_all(client, mocker): expected_task_id = 'xyz@123#456' - launch_refresh_cache_all = mocker.patch( - 'inventory_provider.tasks.worker.launch_refresh_cache_all') - launch_refresh_cache_all.return_value = expected_task_id + update_entry_point = mocker.patch( + 'inventory_provider.tasks.worker.update_entry_point.delay') + update_entry_point.return_value.get.return_value = expected_task_id rv = client.post( 'jobs/update', @@ -43,9 +43,9 @@ def test_job_update_all(client, mocker): def test_job_update_force_pending(client, mocker): expected_task_id = 'asf#asdf%111' - launch_refresh_cache_all = mocker.patch( - 'inventory_provider.tasks.worker.launch_refresh_cache_all') - launch_refresh_cache_all.return_value = expected_task_id + update_entry_point = mocker.patch( + 'inventory_provider.tasks.worker.update_entry_point.delay') + update_entry_point.return_value.get.return_value = expected_task_id mocked_get_latch = mocker.patch( 'inventory_provider.routes.jobs.get_latch') @@ -64,7 +64,7 @@ def test_job_update_pending_force_false(client, mocker): def _assert_if_called(*args, **kwargs): assert False mocker.patch( - 'inventory_provider.tasks.worker.launch_refresh_cache_all', + 'inventory_provider.tasks.worker.update_entry_point', _assert_if_called) mocked_get_latch = mocker.patch( @@ -81,7 +81,7 @@ def test_job_update_pending(client, mocker): def _assert_if_called(*args, **kwargs): assert False mocker.patch( - 'inventory_provider.tasks.worker.launch_refresh_cache_all', + 'inventory_provider.tasks.worker.update_entry_point', _assert_if_called) mocked_get_latch = mocker.patch( @@ -104,7 +104,7 @@ class MockedAsyncResult(object): def test_reload_router_config(client, mocker): delay_result = mocker.patch( - 'inventory_provider.tasks.worker.reload_router_config.delay') + 'inventory_provider.tasks.worker.reload_router_config_chorded.delay') delay_result.return_value = MockedAsyncResult('bogus task id') rv = client.post(