From 34e97e6495e1e44058f264e154e442e7cbb48659 Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Fri, 16 Nov 2018 14:46:48 +0100 Subject: [PATCH] initial working celery tasks --- inventory_provider/juniper.py | 101 +++++++++++++++------------ inventory_provider/router_details.py | 3 +- inventory_provider/tasks/__init__.py | 0 inventory_provider/tasks/app.py | 6 ++ inventory_provider/tasks/config.py | 5 ++ inventory_provider/tasks/worker.py | 71 +++++++++++++++++++ requirements.txt | 1 + setup.py | 3 +- 8 files changed, 142 insertions(+), 48 deletions(-) create mode 100644 inventory_provider/tasks/__init__.py create mode 100644 inventory_provider/tasks/app.py create mode 100644 inventory_provider/tasks/config.py create mode 100644 inventory_provider/tasks/worker.py diff --git a/inventory_provider/juniper.py b/inventory_provider/juniper.py index 54411375..9780d11d 100644 --- a/inventory_provider/juniper.py +++ b/inventory_provider/juniper.py @@ -80,7 +80,51 @@ def ssh_exec_commands(hostname, ssh_params, commands): yield stdout.read().decode("utf-8") -def shell_commands(): +def _loads(s, **args): + """ + the json text contains raw backslashes + :param s: + :param args: + :return: + """ + return json.loads(s.replace("\\", "\\\\"), **args) + + +_DISABLE_PAGING_COMMAND = r'set cli screen-length 0' + + +def fetch_bgp_config(hostname, ssh_params, **args): + + commands = [ + _DISABLE_PAGING_COMMAND, + ('show configuration routing-instances' + ' IAS protocols bgp | display json') + ] + + output = list(ssh_exec_commands(hostname, ssh_params, commands)) + assert len(output) == len(commands) + + if output[1]: + return list(neighbors(_loads(output[1]), **args)) + else: + return {} + + +def fetch_vrr_config(hostname, ssh_params): + + commands = [ + _DISABLE_PAGING_COMMAND, + ('show configuration logical-systems ' + 'VRR protocols bgp | display json') + ] + + output = list(ssh_exec_commands(hostname, ssh_params, commands)) + assert len(output) == len(commands) + + return _loads(output[1]) if output[1] else {} + + +def fetch_interfaces(hostname, ssh_params): def _dups_to_list(pairs): counter_map = {} @@ -94,47 +138,14 @@ def shell_commands(): result[k] = v return result - def _loads(s, **args): - """ - the json text contains raw backslashes - :param s: - :param args: - :return: - """ - return json.loads(s.replace("\\", "\\\\"), **args) - - yield { - "command": "set cli screen-length 0", - "key": None, - "parser": lambda _: None - } - - def _parse_bgp_output(txt, **args): - if txt: - return list(neighbors(_loads(txt), **args)) - else: - return {} - - yield { - "command": ('show configuration routing-instances' - ' IAS protocols bgp | display json'), - "key": "bgp", - "parser": _parse_bgp_output - } - - yield { - "command": ('show configuration logical-systems ' - 'VRR protocols bgp | display json'), - "key": "vrr", - "parser": lambda txt: _loads(txt) if txt else {} - } - - yield { - "command": 'show interfaces descriptions | display json', - "key": "interfaces", - "parser": lambda txt: list(interfaces( - _loads( - txt, - object_pairs_hook=_dups_to_list) - )) if txt else {} - } + commands = [ + _DISABLE_PAGING_COMMAND, + 'show interfaces descriptions | display json' + ] + + output = list(ssh_exec_commands(hostname, ssh_params, commands)) + assert len(output) == len(commands) + + return _loads( + output[1], + object_pairs_hook=_dups_to_list) if output[1] else {} \ No newline at end of file diff --git a/inventory_provider/router_details.py b/inventory_provider/router_details.py index fe38136e..a546deb9 100644 --- a/inventory_provider/router_details.py +++ b/inventory_provider/router_details.py @@ -4,9 +4,8 @@ from multiprocessing import Process, Queue import redis -from inventory_provider import constants +from inventory_provider import constants, juniper from inventory_provider import snmp -from inventory_provider import juniper def get_router_interfaces_q(router, params, q): diff --git a/inventory_provider/tasks/__init__.py b/inventory_provider/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/inventory_provider/tasks/app.py b/inventory_provider/tasks/app.py new file mode 100644 index 00000000..b0fe6ba0 --- /dev/null +++ b/inventory_provider/tasks/app.py @@ -0,0 +1,6 @@ +from celery import Celery +from celery.bin import Option + + +app = Celery("app") +app.config_from_object("inventory_provider.tasks.config") diff --git a/inventory_provider/tasks/config.py b/inventory_provider/tasks/config.py new file mode 100644 index 00000000..8200911b --- /dev/null +++ b/inventory_provider/tasks/config.py @@ -0,0 +1,5 @@ +from os import getenv + +broker_url = getenv( + 'CELERY_BROKER_URL', + default='redis://test-dashboard02.geant.org:6379/1') diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py new file mode 100644 index 00000000..55fa8614 --- /dev/null +++ b/inventory_provider/tasks/worker.py @@ -0,0 +1,71 @@ +import json + +from celery import bootsteps, Task +import redis + +from inventory_provider.tasks.app import app +from inventory_provider import config +from inventory_provider import juniper + + +class InventoryTask(Task): + + config = None + + def __init__(self): + pass + + @staticmethod + def save_key(hostname, key, data): + r = redis.StrictRedis( + host=InventoryTask.config["redis"]["hostname"], + port=InventoryTask.config["redis"]["port"]) + r.hset( + name=hostname, + key=key, + value=json.dumps(data)) + return "OK" + + +class WorkerArgs(bootsteps.Step): + def __init__(self, worker, config_filename, **options): + with open(config_filename) as f: + InventoryTask.config = config.load(f) + + +def worker_args(parser): + parser.add_argument( + "--config_filename", + dest="config_filename", + action='store', + type=str, + help="Configuration filename") + + +app.user_options['worker'].add(worker_args) +app.steps['worker'].add(WorkerArgs) + + +@app.task(bind=InventoryTask) +def juniper_refresh_bgp(self, hostname): + InventoryTask.save_key( + hostname, + "bgp", + juniper.fetch_bgp_config(hostname, InventoryTask.config["ssh"])) + + +@app.task(bind=InventoryTask) +def juniper_refresh_vrr(self, hostname): + InventoryTask.save_key( + hostname, + "vrr", + juniper.fetch_vrr_config(hostname, InventoryTask.config["ssh"])) + + +@app.task(bind=InventoryTask) +def juniper_refresh_interfaces(self, hostname): + InventoryTask.save_key( + hostname, + "interfaces", + juniper.fetch_interfaces(hostname, InventoryTask.config["ssh"])) + diff --git a/requirements.txt b/requirements.txt index 5ae83dfd..21adce50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ jsonschema paramiko flask redis +celery pytest pytest-mock diff --git a/setup.py b/setup.py index 86b8970e..7bd7ee5d 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ setup( 'jsonschema', 'paramiko', 'flask', - 'redis' + 'redis', + 'celery' ] ) -- GitLab