diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index c65e5600886b4d41a993ebef597d748e0fd47d48..ef2850a478c89fb5ae29281016bf0dcd9a49b0dd 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -110,13 +110,43 @@ def update_interfaces_to_services(self): rp = r.pipeline() for equipment_interface, services in interface_services.items(): rp.set( - 'opsdb:interface_services:' + equipment_interface, + f'opsdb:interface_services:{equipment_interface}', json.dumps(services)) rp.execute() logger.debug('<<< update_interfaces_to_services') +@app.task(base=InventoryTask, bind=True) +def update_access_services(self): + logger.debug('>>> update_access_services') + + access_services = {} + with db.connection(InventoryTask.config["ops-db"]) as cx: + for service in opsdb.get_access_services(cx): + + equipment_interface = '%s:%s' % ( + service['equipment'], service['interface_name']) + + if equipment_interface in access_services: + logger.warning( + f'got multiple access services for {equipment_interface}') + + access_services[equipment_interface] = service + + r = get_next_redis(InventoryTask.config) + for key in r.scan_iter('opsdb:access_services:*'): + r.delete(key) + rp = r.pipeline() + for equipment_interface, services in access_services.items(): + rp.set( + f'opsdb:access_services:{equipment_interface}', + json.dumps(services)) + rp.execute() + + logger.debug('<<< update_access_services') + + @app.task(base=InventoryTask, bind=True) def update_lg_routers(self): logger.debug('>>> update_lg_routers') @@ -504,7 +534,8 @@ def launch_refresh_cache_all(config): # juniper netconf & snmp data subtasks = [ update_equipment_locations.apply_async(), - update_lg_routers.apply_async() + update_lg_routers.apply_async(), + update_access_services.apply_async() ] for hostname in _derive_router_hostnames(config): logger.debug('queueing router refresh jobs for %r' % hostname)