Skip to content
Snippets Groups Projects
ims_worker.py 10.09 KiB
import csv
import json
import logging
import subprocess
import tempfile
from collections import defaultdict
from datetime import datetime
from enum import IntFlag
from pathlib import Path
from uuid import uuid4

from kombu.exceptions import KombuError
from redis import RedisError

from inventory_provider.db import ims_data
from inventory_provider.db.ims import IMS
from inventory_provider import environment, config
from inventory_provider.tasks import monitor, data
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common import get_current_redis, \
    get_next_redis, update_latch_status, get_latch, set_latch
from inventory_provider.tasks.worker import InventoryTask, \
    log_task_entry_and_exit, import_unmanaged_interfaces, \
    reload_router_config, refresh_finalizer, update_neteng_managed_device_list, \
    _erase_next_db

environment.setup_logging()

logger = logging.getLogger(__name__)


@log_task_entry_and_exit
def launch_refresh_cache_all(config):
    """
    utility function intended to be called outside of the worker process
    :param config: config structure as defined in config.py
    :return:
    """

    try:
        _erase_next_db(config)
        update_latch_status(config, pending=True)

        monitor.clear_joblog(get_current_redis(config))

        # first batch of subtasks: refresh cached IMS data
        subtasks = [
            update_neteng_managed_device_list.apply_async(),
            update_equipment_locations_ims.apply_async(),
            update_lg_routers_ims.apply_async(),
            update_fibre_spans_ims.apply_async(),
        ]
        [x.get() for x in subtasks]

        # now launch the task whose only purpose is to
        # act as a convenient parent for all of the remaining tasks
        t = internal_refresh_phase_2_ims.apply_async()
        return t.id

    except (RedisError, KombuError):
        update_latch_status(config, pending=False, failure=True)
        logger.exception('error launching refresh subtasks')
        raise


@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2_ims')
@log_task_entry_and_exit
def internal_refresh_phase_2_ims(self):
    # second batch of subtasks:
    #   alarms db status cache
    #   juniper netconf & snmp data
    try:

        subtasks = [
            update_circuit_hierarchy_ims.apply_async(),
            update_interfaces_to_port_ids_ims.apply_async(),
            update_port_ids_to_services_ims.apply_async(),
            import_unmanaged_interfaces.apply_async()
        ]

        r = get_next_redis(InventoryTask.config)
        routers = r.get('netdash')
        assert routers
        netdash_equipment = json.loads(routers.decode('utf-8'))
        # for hostname in data.derive_router_hostnames(InventoryTask.config):
        for hostname in netdash_equipment:
            logger.debug(f'queueing router refresh jobs for {hostname}')
            subtasks.append(reload_router_config.apply_async(args=[hostname]))

        pending_task_ids = [x.id for x in subtasks]

        refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])

    except KombuError:
        # TODO: possible race condition here
        # e.g. if one of these tasks takes a long time and another
        # update is started, we could end up with strange data
        update_latch_status(config, pending=False, failure=True)
        logger.exception('error launching refresh phase 2 subtasks')
        raise


@app.task(base=InventoryTask, bind=True, name='update_fibre_spans_ims')
@log_task_entry_and_exit
def update_fibre_spans_ims(self, use_current=False):

    if use_current:
        r = get_current_redis(InventoryTask.config)
    else:
        r = get_next_redis(InventoryTask.config)
    rp = r.pipeline()
    # scan with bigger batches, to mitigate network latency effects
    for key in r.scan_iter('ims:ne_fibre_spans:*', count=1000):
        rp.delete(key)
    rp.execute()

    c = InventoryTask.config["ims"]
    ds = IMS(c['api'], c['username'], c['password'])

    rp = r.pipeline()
    for ne, fs in ims_data.get_fibre_info(ds):
        rp.set(
            f'ims:ne_fibre_spans:{ne}',
            json.dumps(fs))
    rp.execute()


@app.task(
    base=InventoryTask, bind=True, name='update_interfaces_to_port_ids_ims')
@log_task_entry_and_exit
def update_interfaces_to_port_ids_ims(self, use_current=False):

    if use_current:
        r = get_current_redis(InventoryTask.config)
    else:
        r = get_next_redis(InventoryTask.config)

    rp = r.pipeline()
    # scan with bigger batches, to mitigate network latency effects
    for key in r.scan_iter('ims:interface_port_ids:*', count=2000):
        rp.delete(key)
    for key in r.scan_iter('ims:port_id_interface:*', count=2000):
        rp.delete(key)
    rp.execute()

    c = InventoryTask.config["ims"]
    ds = IMS(c['api'], c['username'], c['password'])

    rp = r.pipeline()
    for pd in ims_data.get_port_details(ds):
        d = json.dumps(pd)
        rp.set(
            f'ims:interface_port_ids:{pd["equipment_name"]}'
            f':{pd["interface_name"]}',
            d)
        rp.set(
            f'ims:port_id_interface:{pd["port_id"]}',
            d)
    rp.execute()


@app.task(
    base=InventoryTask, bind=True, name='update_port_ids_to_services_ims')
@log_task_entry_and_exit
def update_port_ids_to_services_ims(self, use_current=False):
    interface_services = defaultdict(list)

    c = InventoryTask.config["ims"]
    ds = IMS(c['api'], c['username'], c['password'])

    for service in ims_data.get_port_id_services(ds):
        interface_services[service["port_a_id"]].append(service)

    if use_current:
        r = get_current_redis(InventoryTask.config)
    else:
        r = get_next_redis(InventoryTask.config)

    rp = r.pipeline()
    # scan with bigger batches, to mitigate network latency effects
    for key in r.scan_iter('ims:port_id_services:*', count=2000):
        rp.delete(key)
    rp.execute()

    rp = r.pipeline()
    for port_id, services in interface_services.items():
        rp.set(
            f'ims:port_id_services:{port_id}',
            json.dumps(services))
    rp.execute()


@app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy_ims')
@log_task_entry_and_exit
def update_circuit_hierarchy_ims(self, use_current=False):

    if use_current:
        r = get_current_redis(InventoryTask.config)
    else:
        r = get_next_redis(InventoryTask.config)
    rp = r.pipeline()
    for k in r.scan_iter('ims:circuit_hierarchy:*', count=2000):
        rp.delete(k)
    rp.execute()

    c = InventoryTask.config["ims"]
    ds = IMS(c['api'], c['username'], c['password'])

    rp = r.pipeline()
    for d in ims_data.get_circuit_hierarchy(ds):
        rp.set(f'ims:circuit_hierarchy:{d["id"]}', json.dumps([d]))
    rp.execute()


@app.task(base=InventoryTask, bind=True, name='update_equipment_locations_ims')
@log_task_entry_and_exit
def update_equipment_locations_ims(self, use_current=False):

    if use_current:
        r = get_current_redis(InventoryTask.config)
    else:
        r = get_next_redis(InventoryTask.config)
    rp = r.pipeline()
    for k in r.scan_iter('ims:location:*', count=1000):
        rp.delete(k)
    rp.execute()

    c = InventoryTask.config["ims"]
    ds = IMS(c['api'], c['username'], c['password'])

    rp = r.pipeline()
    hostnames_found = set()
    for h, d in ims_data.get_node_locations(ds):
        # put into a list to match non-IMS version
        rp.set(f'ims:location:{h}', json.dumps([d]))
        if h in hostnames_found:
            print(f'Multiple entries for {h}')
        hostnames_found.add(h)
    rp.execute()


@app.task(base=InventoryTask, bind=True, name='update_lg_routers_ims')
@log_task_entry_and_exit
def update_lg_routers_ims(self, use_current=False):

    if use_current:
        r = get_current_redis(InventoryTask.config)
        for k in r.scan_iter('ims:lg:*'):
            r.delete(k)
    else:
        r = get_next_redis(InventoryTask.config)

    for k in r.scan_iter('ims:lg:*'):
        r.delete(k)
    c = InventoryTask.config["ims"]
    ds = IMS(c['api'], c['username'], c['password'])

    for router in ims_data.lookup_lg_routers(ds):
        r.set(f'ims:lg:{router["equipment name"]}', json.dumps(router))


class OTRSFiles(IntFlag):
    CUSTOMER_COMPANIES = 1
    CUSTOMER_USERS = 2


@app.task(base=InventoryTask, bind=True, name='export_data_for_otrs')
@log_task_entry_and_exit
def export_data_for_otrs(self, files_to_export=None, export_duplicates=False):
    debug_uuid = uuid4()
    logger.debug(f'debug uuid: {debug_uuid}')
    if files_to_export:
        files_to_export = OTRSFiles(files_to_export)
    else:
        files_to_export = set(OTRSFiles)

    ims_config = InventoryTask.config["ims"]
    otrs_config = InventoryTask.config["otrs-export"]

    command_template = 'rsync -aPq --no-perms --rsh="ssh -l {user} -p 22 -i {key_file} -o \'UserKnownHostsFile {known_hosts}\'" {source_dir}/* {destination}'  # noqa

    with tempfile.TemporaryDirectory() as temp_dir:
        temp_path = Path(temp_dir)
        ds = IMS(
            ims_config['api'],
            ims_config['username'],
            ims_config['password'])

        prefix = datetime.now().strftime('%Y%m%d') + '_'

        if OTRSFiles.CUSTOMER_COMPANIES in files_to_export:
            cus_co_path = temp_path.joinpath(f'{prefix}customer_company.csv')
            with open(cus_co_path, 'w+') as f:
                writer = csv.writer(f, delimiter='^')
                writer.writerows(ims_data.otrs_get_customer_company_rows(ds))

        if OTRSFiles.CUSTOMER_USERS in files_to_export:
            cus_usr_path = temp_path.joinpath(f'{prefix}customer_user.csv')
            with open(cus_usr_path, 'w+') as f:
                writer = csv.writer(f, delimiter='^')
                writer.writerows(ims_data.otrs_get_customer_users_rows(
                    ds, return_duplicates=export_duplicates))

        command = command_template.format(
            user=otrs_config['username'],
            key_file=otrs_config['private-key'],
            known_hosts=otrs_config['known-hosts'],
            source_dir=temp_dir,
            destination=otrs_config['destination']
        )
        subprocess.run(command, shell=True, check=True)
    return debug_uuid