diff --git a/inventory_provider/app.py b/inventory_provider/app.py index 25b9d63b51a4cc095c393f4d1fd2e75eaade4248..96316f2319e545c2e8094b08d024a72c4f0b69a8 100644 --- a/inventory_provider/app.py +++ b/inventory_provider/app.py @@ -9,7 +9,7 @@ from inventory_provider import constants logging.basicConfig(level=logging.WARNING) logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel(logging.DEBUG) -logging.getLogger(constants.THREADING_LOGGER_NAME).setLevel(logging.INFO) +logging.getLogger(constants.TASK_LOGGER_NAME).setLevel(logging.INFO) logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel(logging.DEBUG) logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel(logging.DEBUG) diff --git a/inventory_provider/constants.py b/inventory_provider/constants.py index 6c51564517c231ef643075ebd2703b73725cbd4b..f88475574e7b44dd3d17116027a2ba6c8588a0a6 100644 --- a/inventory_provider/constants.py +++ b/inventory_provider/constants.py @@ -1,4 +1,4 @@ SNMP_LOGGER_NAME = "snmp-logger" -THREADING_LOGGER_NAME = "threading-logger" JUNIPER_LOGGER_NAME = "juniper-logger" DATABASE_LOGGER_NAME = "database-logger" +TASK_LOGGER_NAME = "task-logger" diff --git a/inventory_provider/juniper.py b/inventory_provider/juniper.py index 544113751814951a09f4051cb74f151bbff00143..d801c8d329f781a1e5ee792c4b429f749eb65d4c 100644 --- a/inventory_provider/juniper.py +++ b/inventory_provider/juniper.py @@ -80,7 +80,51 @@ def ssh_exec_commands(hostname, ssh_params, commands): yield stdout.read().decode("utf-8") -def shell_commands(): +def _loads(s, **args): + """ + the json text contains raw backslashes + :param s: + :param args: + :return: + """ + return json.loads(s.replace("\\", "\\\\"), **args) + + +_DISABLE_PAGING_COMMAND = r'set cli screen-length 0' + + +def fetch_bgp_config(hostname, ssh_params, **args): + + commands = [ + _DISABLE_PAGING_COMMAND, + ('show configuration routing-instances' + ' IAS protocols bgp | display json') + ] + + output = list(ssh_exec_commands(hostname, ssh_params, commands)) + assert len(output) == len(commands) + + if output[1]: + return list(neighbors(_loads(output[1]), **args)) + else: + return {} + + +def fetch_vrr_config(hostname, ssh_params): + + commands = [ + _DISABLE_PAGING_COMMAND, + ('show configuration logical-systems ' + 'VRR protocols bgp | display json') + ] + + output = list(ssh_exec_commands(hostname, ssh_params, commands)) + assert len(output) == len(commands) + + return _loads(output[1]) if output[1] else {} + + +def fetch_interfaces(hostname, ssh_params): def _dups_to_list(pairs): counter_map = {} @@ -94,47 +138,14 @@ def shell_commands(): result[k] = v return result - def _loads(s, **args): - """ - the json text contains raw backslashes - :param s: - :param args: - :return: - """ - return json.loads(s.replace("\\", "\\\\"), **args) - - yield { - "command": "set cli screen-length 0", - "key": None, - "parser": lambda _: None - } - - def _parse_bgp_output(txt, **args): - if txt: - return list(neighbors(_loads(txt), **args)) - else: - return {} - - yield { - "command": ('show configuration routing-instances' - ' IAS protocols bgp | display json'), - "key": "bgp", - "parser": _parse_bgp_output - } - - yield { - "command": ('show configuration logical-systems ' - 'VRR protocols bgp | display json'), - "key": "vrr", - "parser": lambda txt: _loads(txt) if txt else {} - } - - yield { - "command": 'show interfaces descriptions | display json', - "key": "interfaces", - "parser": lambda txt: list(interfaces( - _loads( - txt, - object_pairs_hook=_dups_to_list) - )) if txt else {} - } + commands = [ + _DISABLE_PAGING_COMMAND, + 'show interfaces descriptions | display json' + ] + + output = list(ssh_exec_commands(hostname, ssh_params, commands)) + assert len(output) == len(commands) + + return _loads( + output[1], + object_pairs_hook=_dups_to_list) if output[1] else {} diff --git a/inventory_provider/router_details.py b/inventory_provider/router_details.py index fe38136e2755020167a0b7f104b781bb8332e263..5cb95e39d9e7efd415f15b2e9f52cfc10205ca4a 100644 --- a/inventory_provider/router_details.py +++ b/inventory_provider/router_details.py @@ -1,98 +1,43 @@ import json import logging -from multiprocessing import Process, Queue import redis -from inventory_provider import constants -from inventory_provider import snmp -from inventory_provider import juniper - - -def get_router_interfaces_q(router, params, q): - threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME) - threading_logger.debug("[ENTER>>] get_router_interfaces_q: %r" % router) - q.put(list(snmp.get_router_interfaces(router, params))) - threading_logger.debug("[<<EXIT] get_router_interfaces_q: %r" % router) - - -def ssh_exec_commands_q(hostname, ssh_params, commands, q): - threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME) - threading_logger.debug("[ENTER>>] exec_router_commands_q: %r" % hostname) - q.put(list(juniper.ssh_exec_commands(hostname, ssh_params, commands))) - threading_logger.debug("[<<EXIT] exec_router_commands_q: %r" % hostname) - - -def get_router_details(router, params): - - threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME) - - threading_logger.debug("[ENTER>>]get_router_details: %r" % router) - - commands = list(juniper.shell_commands()) - - snmpifc_proc_queue = Queue() - snmpifc_proc = Process( - target=get_router_interfaces_q, - args=(router, params, snmpifc_proc_queue)) - snmpifc_proc.start() - - commands_proc_queue = Queue() - commands_proc = Process( - target=ssh_exec_commands_q, - args=( - router["hostname"], - params["ssh"], - [c["command"] for c in commands], - commands_proc_queue)) - commands_proc.start() - - threading_logger.debug("waiting for commands result: %r" % router) - command_output = commands_proc_queue.get() - assert len(command_output) == len(commands) - - r = redis.StrictRedis( - host=params["redis"]["hostname"], - port=params["redis"]["port"]) - for c, o in zip(commands, command_output): - if c["key"]: - r.hset( - name=router["hostname"], - key=c["key"], - value=json.dumps(c["parser"](o))) - commands_proc.join() - threading_logger.debug("... got commands result & joined: %r" % router) - - threading_logger.debug("waiting for snmp ifc results: %r" % router) - r.hset( - name=router["hostname"], - key="snmp-interfaces", - value=json.dumps(snmpifc_proc_queue.get())) - snmpifc_proc.join() - threading_logger.debug("... got snmp ifc result & joined: %r" % router) - - threading_logger.debug("[<<EXIT]get_router_details: %r" % router) +from inventory_provider.tasks.app import app +from inventory_provider.constants import TASK_LOGGER_NAME + + +def get_router_details(router): + task_logger = logging.getLogger(TASK_LOGGER_NAME) + task_logger.debug("launching task: " + "inventory_provider.tasks.worker.juniper_refresh_bgp") + app.send_task( + 'inventory_provider.tasks.worker.juniper_refresh_bgp', + args=[router["hostname"]]) + task_logger.debug("launching task: " + "inventory_provider.tasks.worker.juniper_refresh_vrr") + app.send_task( + 'inventory_provider.tasks.worker.juniper_refresh_vrr', + args=[router["hostname"]]) + task_logger.debug("launching task: " + "inventory_provider" + ".tasks.worker.juniper_refresh_interfaces") + app.send_task( + 'inventory_provider.tasks.worker.juniper_refresh_interfaces', + args=[router["hostname"]]) + task_logger.debug("launching task: " + "inventory_provider" + ".tasks.worker.snmp_refresh_interfaces") + app.send_task( + 'inventory_provider.tasks.worker.snmp_refresh_interfaces', + args=[router["hostname"], router["community"]]) def update_network_details(params): - - threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME) - - processes = [] + task_logger = logging.getLogger(TASK_LOGGER_NAME) for r in params["routers"]: - p = Process(target=get_router_details, args=(r, params)) - p.start() - processes.append({"router": r, "process": p}) - - result = {} - for p in processes: - threading_logger.debug( - "waiting for get_router_details result: %r" % p["router"]) - p["process"].join() - threading_logger.debug( - "got result and joined get_router_details proc: %r" % p["router"]) - - return result + task_logger.info("fetching router details for: %r" % r) + get_router_details(r) def load_network_details(redis_params): @@ -111,3 +56,15 @@ def load_network_details(redis_params): result[hostname.decode("utf-8")] = host return result + + +if __name__ == "__main__": + from inventory_provider import config + with open("config.json") as f: + params = config.load(f) + + # update_network_details(params) + + network_info = load_network_details(params["redis"]) + with open("./router-info.json", "w") as f: + f.write(json.dumps(network_info)) diff --git a/inventory_provider/snmp.py b/inventory_provider/snmp.py index 5d5884031192fc05be4afe584f9e0bebc02c70d0..9492c27dbe293a76f08323135480fa5cf1e803e5 100644 --- a/inventory_provider/snmp.py +++ b/inventory_provider/snmp.py @@ -68,12 +68,12 @@ def walk(agent_hostname, community, base_oid): yield {"oid": "." + str(oid), "value": val.prettyPrint()} -def get_router_interfaces(router, config): +def get_router_interfaces(hostname, community, config): oid_map = config["oids"] details = {} for name, oid in oid_map.items(): - details[name] = walk(router["hostname"], router["community"], oid) + details[name] = walk(hostname, community, oid) details[name] = list(details[name]) v4IfcNames = {} diff --git a/inventory_provider/tasks/__init__.py b/inventory_provider/tasks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/inventory_provider/tasks/app.py b/inventory_provider/tasks/app.py new file mode 100644 index 0000000000000000000000000000000000000000..15a42327ee55a1058b513bb32f52a5f388069efa --- /dev/null +++ b/inventory_provider/tasks/app.py @@ -0,0 +1,4 @@ +from celery import Celery + +app = Celery("app") +app.config_from_object("inventory_provider.tasks.config") diff --git a/inventory_provider/tasks/config.py b/inventory_provider/tasks/config.py new file mode 100644 index 0000000000000000000000000000000000000000..8200911b3d1a5c60b410949bde6a40c9edd67aba --- /dev/null +++ b/inventory_provider/tasks/config.py @@ -0,0 +1,5 @@ +from os import getenv + +broker_url = getenv( + 'CELERY_BROKER_URL', + default='redis://test-dashboard02.geant.org:6379/1') diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py new file mode 100644 index 0000000000000000000000000000000000000000..acc0ed2b2feaf7af946ef3f89554ae094d25b7bf --- /dev/null +++ b/inventory_provider/tasks/worker.py @@ -0,0 +1,81 @@ +import json + +from celery import bootsteps, Task +import redis + +from inventory_provider.tasks.app import app +from inventory_provider import config +from inventory_provider import juniper, snmp + + +class InventoryTask(Task): + + config = None + + def __init__(self): + pass + + @staticmethod + def save_key(hostname, key, data): + r = redis.StrictRedis( + host=InventoryTask.config["redis"]["hostname"], + port=InventoryTask.config["redis"]["port"]) + r.hset( + name=hostname, + key=key, + value=json.dumps(data)) + return "OK" + + +class WorkerArgs(bootsteps.Step): + def __init__(self, worker, config_filename, **options): + with open(config_filename) as f: + InventoryTask.config = config.load(f) + + +def worker_args(parser): + parser.add_argument( + "--config_filename", + dest="config_filename", + action='store', + type=str, + help="Configuration filename") + + +app.user_options['worker'].add(worker_args) +app.steps['worker'].add(WorkerArgs) + + +@app.task(bind=InventoryTask) +def juniper_refresh_bgp(self, hostname): + InventoryTask.save_key( + hostname, + "bgp", + juniper.fetch_bgp_config(hostname, InventoryTask.config["ssh"])) + + +@app.task(bind=InventoryTask) +def juniper_refresh_vrr(self, hostname): + InventoryTask.save_key( + hostname, + "vrr", + juniper.fetch_vrr_config(hostname, InventoryTask.config["ssh"])) + + +@app.task(bind=InventoryTask) +def juniper_refresh_interfaces(self, hostname): + InventoryTask.save_key( + hostname, + "interfaces", + juniper.fetch_interfaces(hostname, InventoryTask.config["ssh"])) + + +@app.task(bind=InventoryTask) +def snmp_refresh_interfaces(self, hostname, community): + InventoryTask.save_key( + hostname, + "interfaces", + list(snmp.get_router_interfaces( + hostname, + community, + InventoryTask.config))) diff --git a/requirements.txt b/requirements.txt index 5ae83dfda7753c6fc1080dade3f4fe7f4696835a..21adce509bee7dfcc31ee1e5e04cd24d199a3333 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ jsonschema paramiko flask redis +celery pytest pytest-mock diff --git a/setup.py b/setup.py index 86b8970e410379f3561e34bb4093ff5b190d96e6..7bd7ee5d2dc6631242a2e80c7ba373fe9655adb1 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ setup( 'jsonschema', 'paramiko', 'flask', - 'redis' + 'redis', + 'celery' ] ) diff --git a/test/test_juniper_data.py b/test/test_juniper_data.py index 3157a7d5528ef45177fc07595d664d294fa70464..b35403f97424728a9c02043ca25f944cbd4d7da1 100644 --- a/test/test_juniper_data.py +++ b/test/test_juniper_data.py @@ -119,16 +119,24 @@ def router_output(request): return output -def test_ipv4_neighbors(router_output): +def test_ipv4_neighbors(mocker, router_output): old_v4_data = dict([ (x["neighbor"], x) for x in _parsed_old_style_output_data(router_output["bgpv4"])]) - parsers = dict([(c["key"], c["parser"]) for c in juniper.shell_commands()]) - neighbors = parsers["bgp"]( - router_output["bgp"], + def _mocked_ssh_exec_commands(hostname, params, commands): + assert len(commands) == 2 # the expected number + return [None, router_output["bgp"]] + + mocker.patch( + 'inventory_provider.juniper.ssh_exec_commands', + _mocked_ssh_exec_commands) + + neighbors = juniper.fetch_bgp_config( + None, + None, group_expression=r'^GEANT-IX[\s-].*$') assert len(neighbors) == len(old_v4_data) @@ -138,16 +146,24 @@ def test_ipv4_neighbors(router_output): assert old_v4_data[address]["description"] == description -def test_ipv6_neighbors(router_output): +def test_ipv6_neighbors(mocker, router_output): old_v6_data = dict([ (x["neighbor"], x) for x in _parsed_old_style_output_data(router_output["bgpv6"])]) - parsers = dict([(c["key"], c["parser"]) for c in juniper.shell_commands()]) - neighbors = parsers["bgp"]( - router_output["bgp"], + def _mocked_ssh_exec_commands(hostname, params, commands): + assert len(commands) == 2 # the expected number + return [None, router_output["bgp"]] + + mocker.patch( + 'inventory_provider.juniper.ssh_exec_commands', + _mocked_ssh_exec_commands) + + neighbors = juniper.fetch_bgp_config( + None, + None, group_expression=r'^GEANT-IXv6[\s-].*$') assert len(neighbors) == len(old_v6_data) @@ -157,7 +173,14 @@ def test_ipv6_neighbors(router_output): assert old_v6_data[address]["description"] == description -def test_juniper_shell_output_parsing(router_output): +COMMAND_HANDLERS = { + 'bgp': juniper.fetch_bgp_config, + 'vrr': juniper.fetch_vrr_config, + 'interfaces': juniper.fetch_interfaces +} + + +def test_juniper_shell_output_parsing(mocker, router_output): """ just call the correct parser for each type of shell output (not a proper test ... just verifies there's no crash) @@ -166,7 +189,17 @@ def test_juniper_shell_output_parsing(router_output): :param router_output: :return: """ - for c in juniper.shell_commands(): - if c["key"] is None: - continue - c["parser"](router_output[c["key"]]) + + output = None + + def _mocked_ssh_exec_commands(hostname, params, commands): + assert len(commands) == 2 # the expected number + return [None, output] + + mocker.patch( + 'inventory_provider.juniper.ssh_exec_commands', + _mocked_ssh_exec_commands) + + for key in ["bgp", "vrr", "interfaces"]: + output = router_output[key] + COMMAND_HANDLERS[key](None, None)