import csv import json import logging import subprocess import tempfile from collections import defaultdict from datetime import datetime from enum import IntFlag from pathlib import Path from uuid import uuid4 from kombu.exceptions import KombuError from redis import RedisError from inventory_provider.db import ims_data from inventory_provider.db.ims import IMS from inventory_provider import environment, config from inventory_provider.tasks import monitor, data from inventory_provider.tasks.app import app from inventory_provider.tasks.common import get_current_redis, \ get_next_redis, update_latch_status, get_latch, set_latch from inventory_provider.tasks.worker import InventoryTask, \ log_task_entry_and_exit, import_unmanaged_interfaces, \ reload_router_config, refresh_finalizer, update_neteng_managed_device_list, \ _erase_next_db environment.setup_logging() logger = logging.getLogger(__name__) @log_task_entry_and_exit def launch_refresh_cache_all(config): """ utility function intended to be called outside of the worker process :param config: config structure as defined in config.py :return: """ try: _erase_next_db(config) update_latch_status(config, pending=True) monitor.clear_joblog(get_current_redis(config)) # first batch of subtasks: refresh cached IMS data subtasks = [ update_neteng_managed_device_list.apply_async(), update_equipment_locations_ims.apply_async(), update_lg_routers_ims.apply_async(), update_fibre_spans_ims.apply_async(), ] [x.get() for x in subtasks] # now launch the task whose only purpose is to # act as a convenient parent for all of the remaining tasks t = internal_refresh_phase_2_ims.apply_async() return t.id except (RedisError, KombuError): update_latch_status(config, pending=False, failure=True) logger.exception('error launching refresh subtasks') raise @app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2_ims') @log_task_entry_and_exit def internal_refresh_phase_2_ims(self): # second batch of subtasks: # alarms db status cache # juniper netconf & snmp data try: subtasks = [ update_circuit_hierarchy_ims.apply_async(), update_interfaces_to_port_ids_ims.apply_async(), update_port_ids_to_services_ims.apply_async(), import_unmanaged_interfaces.apply_async() ] r = get_next_redis(InventoryTask.config) routers = r.get('netdash') assert routers netdash_equipment = json.loads(routers.decode('utf-8')) # for hostname in data.derive_router_hostnames(InventoryTask.config): for hostname in netdash_equipment: logger.debug(f'queueing router refresh jobs for {hostname}') subtasks.append(reload_router_config.apply_async(args=[hostname])) pending_task_ids = [x.id for x in subtasks] refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) except KombuError: # TODO: possible race condition here # e.g. if one of these tasks takes a long time and another # update is started, we could end up with strange data update_latch_status(config, pending=False, failure=True) logger.exception('error launching refresh phase 2 subtasks') raise @app.task(base=InventoryTask, bind=True, name='update_fibre_spans_ims') @log_task_entry_and_exit def update_fibre_spans_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) else: r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for key in r.scan_iter('ims:ne_fibre_spans:*', count=1000): rp.delete(key) rp.execute() c = InventoryTask.config["ims"] ds = IMS(c['api'], c['username'], c['password']) rp = r.pipeline() for ne, fs in ims_data.get_fibre_info(ds): rp.set( f'ims:ne_fibre_spans:{ne}', json.dumps(fs)) rp.execute() @app.task( base=InventoryTask, bind=True, name='update_interfaces_to_port_ids_ims') @log_task_entry_and_exit def update_interfaces_to_port_ids_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) else: r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for key in r.scan_iter('ims:interface_port_ids:*', count=2000): rp.delete(key) for key in r.scan_iter('ims:port_id_interface:*', count=2000): rp.delete(key) rp.execute() c = InventoryTask.config["ims"] ds = IMS(c['api'], c['username'], c['password']) rp = r.pipeline() for pd in ims_data.get_port_details(ds): d = json.dumps(pd) rp.set( f'ims:interface_port_ids:{pd["equipment_name"]}' f':{pd["interface_name"]}', d) rp.set( f'ims:port_id_interface:{pd["port_id"]}', d) rp.execute() @app.task( base=InventoryTask, bind=True, name='update_port_ids_to_services_ims') @log_task_entry_and_exit def update_port_ids_to_services_ims(self, use_current=False): interface_services = defaultdict(list) c = InventoryTask.config["ims"] ds = IMS(c['api'], c['username'], c['password']) for service in ims_data.get_port_id_services(ds): interface_services[service["port_a_id"]].append(service) if use_current: r = get_current_redis(InventoryTask.config) else: r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for key in r.scan_iter('ims:port_id_services:*', count=2000): rp.delete(key) rp.execute() rp = r.pipeline() for port_id, services in interface_services.items(): rp.set( f'ims:port_id_services:{port_id}', json.dumps(services)) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy_ims') @log_task_entry_and_exit def update_circuit_hierarchy_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) else: r = get_next_redis(InventoryTask.config) rp = r.pipeline() for k in r.scan_iter('ims:circuit_hierarchy:*', count=2000): rp.delete(k) rp.execute() c = InventoryTask.config["ims"] ds = IMS(c['api'], c['username'], c['password']) rp = r.pipeline() for d in ims_data.get_circuit_hierarchy(ds): rp.set(f'ims:circuit_hierarchy:{d["id"]}', json.dumps([d])) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_equipment_locations_ims') @log_task_entry_and_exit def update_equipment_locations_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) else: r = get_next_redis(InventoryTask.config) rp = r.pipeline() for k in r.scan_iter('ims:location:*', count=1000): rp.delete(k) rp.execute() c = InventoryTask.config["ims"] ds = IMS(c['api'], c['username'], c['password']) rp = r.pipeline() hostnames_found = set() for h, d in ims_data.get_node_locations(ds): # put into a list to match non-IMS version rp.set(f'ims:location:{h}', json.dumps([d])) if h in hostnames_found: print(f'Multiple entries for {h}') hostnames_found.add(h) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_lg_routers_ims') @log_task_entry_and_exit def update_lg_routers_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) for k in r.scan_iter('ims:lg:*'): r.delete(k) else: r = get_next_redis(InventoryTask.config) for k in r.scan_iter('ims:lg:*'): r.delete(k) c = InventoryTask.config["ims"] ds = IMS(c['api'], c['username'], c['password']) for router in ims_data.lookup_lg_routers(ds): r.set(f'ims:lg:{router["equipment name"]}', json.dumps(router)) class OTRSFiles(IntFlag): CUSTOMER_COMPANIES = 1 CUSTOMER_USERS = 2 @app.task(base=InventoryTask, bind=True, name='export_data_for_otrs') @log_task_entry_and_exit def export_data_for_otrs(self, files_to_export=None, export_duplicates=False): debug_uuid = uuid4() logger.debug(f'debug uuid: {debug_uuid}') if files_to_export: files_to_export = OTRSFiles(files_to_export) else: files_to_export = set(OTRSFiles) ims_config = InventoryTask.config["ims"] otrs_config = InventoryTask.config["otrs-export"] command_template = 'rsync -aPq --no-perms --rsh="ssh -l {user} -p 22 -i {key_file} -o \'UserKnownHostsFile {known_hosts}\'" {source_dir}/* {destination}' # noqa with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) ds = IMS( ims_config['api'], ims_config['username'], ims_config['password']) prefix = datetime.now().strftime('%Y%m%d') + '_' if OTRSFiles.CUSTOMER_COMPANIES in files_to_export: cus_co_path = temp_path.joinpath(f'{prefix}customer_company.csv') with open(cus_co_path, 'w+') as f: writer = csv.writer(f, delimiter='^') writer.writerows(ims_data.otrs_get_customer_company_rows(ds)) if OTRSFiles.CUSTOMER_USERS in files_to_export: cus_usr_path = temp_path.joinpath(f'{prefix}customer_user.csv') with open(cus_usr_path, 'w+') as f: writer = csv.writer(f, delimiter='^') writer.writerows(ims_data.otrs_get_customer_users_rows( ds, return_duplicates=export_duplicates)) command = command_template.format( user=otrs_config['username'], key_file=otrs_config['private-key'], known_hosts=otrs_config['known-hosts'], source_dir=temp_dir, destination=otrs_config['destination'] ) subprocess.run(command, shell=True, check=True) return debug_uuid