Skip to content
Snippets Groups Projects
Commit 8eb02466 authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature celery-tasks.

parents 6e5a0401 64113ac8
No related branches found
No related tags found
No related merge requests found
...@@ -9,7 +9,7 @@ from inventory_provider import constants ...@@ -9,7 +9,7 @@ from inventory_provider import constants
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel(logging.DEBUG) logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.THREADING_LOGGER_NAME).setLevel(logging.INFO) logging.getLogger(constants.TASK_LOGGER_NAME).setLevel(logging.INFO)
logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel(logging.DEBUG) logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel(logging.DEBUG) logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel(logging.DEBUG)
......
SNMP_LOGGER_NAME = "snmp-logger" SNMP_LOGGER_NAME = "snmp-logger"
THREADING_LOGGER_NAME = "threading-logger"
JUNIPER_LOGGER_NAME = "juniper-logger" JUNIPER_LOGGER_NAME = "juniper-logger"
DATABASE_LOGGER_NAME = "database-logger" DATABASE_LOGGER_NAME = "database-logger"
TASK_LOGGER_NAME = "task-logger"
...@@ -80,7 +80,51 @@ def ssh_exec_commands(hostname, ssh_params, commands): ...@@ -80,7 +80,51 @@ def ssh_exec_commands(hostname, ssh_params, commands):
yield stdout.read().decode("utf-8") yield stdout.read().decode("utf-8")
def shell_commands(): def _loads(s, **args):
"""
the json text contains raw backslashes
:param s:
:param args:
:return:
"""
return json.loads(s.replace("\\", "\\\\"), **args)
_DISABLE_PAGING_COMMAND = r'set cli screen-length 0'
def fetch_bgp_config(hostname, ssh_params, **args):
commands = [
_DISABLE_PAGING_COMMAND,
('show configuration routing-instances'
' IAS protocols bgp | display json')
]
output = list(ssh_exec_commands(hostname, ssh_params, commands))
assert len(output) == len(commands)
if output[1]:
return list(neighbors(_loads(output[1]), **args))
else:
return {}
def fetch_vrr_config(hostname, ssh_params):
commands = [
_DISABLE_PAGING_COMMAND,
('show configuration logical-systems '
'VRR protocols bgp | display json')
]
output = list(ssh_exec_commands(hostname, ssh_params, commands))
assert len(output) == len(commands)
return _loads(output[1]) if output[1] else {}
def fetch_interfaces(hostname, ssh_params):
def _dups_to_list(pairs): def _dups_to_list(pairs):
counter_map = {} counter_map = {}
...@@ -94,47 +138,14 @@ def shell_commands(): ...@@ -94,47 +138,14 @@ def shell_commands():
result[k] = v result[k] = v
return result return result
def _loads(s, **args): commands = [
""" _DISABLE_PAGING_COMMAND,
the json text contains raw backslashes 'show interfaces descriptions | display json'
:param s: ]
:param args:
:return: output = list(ssh_exec_commands(hostname, ssh_params, commands))
""" assert len(output) == len(commands)
return json.loads(s.replace("\\", "\\\\"), **args)
return _loads(
yield { output[1],
"command": "set cli screen-length 0", object_pairs_hook=_dups_to_list) if output[1] else {}
"key": None,
"parser": lambda _: None
}
def _parse_bgp_output(txt, **args):
if txt:
return list(neighbors(_loads(txt), **args))
else:
return {}
yield {
"command": ('show configuration routing-instances'
' IAS protocols bgp | display json'),
"key": "bgp",
"parser": _parse_bgp_output
}
yield {
"command": ('show configuration logical-systems '
'VRR protocols bgp | display json'),
"key": "vrr",
"parser": lambda txt: _loads(txt) if txt else {}
}
yield {
"command": 'show interfaces descriptions | display json',
"key": "interfaces",
"parser": lambda txt: list(interfaces(
_loads(
txt,
object_pairs_hook=_dups_to_list)
)) if txt else {}
}
import json import json
import logging import logging
from multiprocessing import Process, Queue
import redis import redis
from inventory_provider import constants from inventory_provider.tasks.app import app
from inventory_provider import snmp from inventory_provider.constants import TASK_LOGGER_NAME
from inventory_provider import juniper
def get_router_details(router):
def get_router_interfaces_q(router, params, q): task_logger = logging.getLogger(TASK_LOGGER_NAME)
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME) task_logger.debug("launching task: "
threading_logger.debug("[ENTER>>] get_router_interfaces_q: %r" % router) "inventory_provider.tasks.worker.juniper_refresh_bgp")
q.put(list(snmp.get_router_interfaces(router, params))) app.send_task(
threading_logger.debug("[<<EXIT] get_router_interfaces_q: %r" % router) 'inventory_provider.tasks.worker.juniper_refresh_bgp',
args=[router["hostname"]])
task_logger.debug("launching task: "
def ssh_exec_commands_q(hostname, ssh_params, commands, q): "inventory_provider.tasks.worker.juniper_refresh_vrr")
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME) app.send_task(
threading_logger.debug("[ENTER>>] exec_router_commands_q: %r" % hostname) 'inventory_provider.tasks.worker.juniper_refresh_vrr',
q.put(list(juniper.ssh_exec_commands(hostname, ssh_params, commands))) args=[router["hostname"]])
threading_logger.debug("[<<EXIT] exec_router_commands_q: %r" % hostname) task_logger.debug("launching task: "
"inventory_provider"
".tasks.worker.juniper_refresh_interfaces")
def get_router_details(router, params): app.send_task(
'inventory_provider.tasks.worker.juniper_refresh_interfaces',
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME) args=[router["hostname"]])
task_logger.debug("launching task: "
threading_logger.debug("[ENTER>>]get_router_details: %r" % router) "inventory_provider"
".tasks.worker.snmp_refresh_interfaces")
commands = list(juniper.shell_commands()) app.send_task(
'inventory_provider.tasks.worker.snmp_refresh_interfaces',
snmpifc_proc_queue = Queue() args=[router["hostname"], router["community"]])
snmpifc_proc = Process(
target=get_router_interfaces_q,
args=(router, params, snmpifc_proc_queue))
snmpifc_proc.start()
commands_proc_queue = Queue()
commands_proc = Process(
target=ssh_exec_commands_q,
args=(
router["hostname"],
params["ssh"],
[c["command"] for c in commands],
commands_proc_queue))
commands_proc.start()
threading_logger.debug("waiting for commands result: %r" % router)
command_output = commands_proc_queue.get()
assert len(command_output) == len(commands)
r = redis.StrictRedis(
host=params["redis"]["hostname"],
port=params["redis"]["port"])
for c, o in zip(commands, command_output):
if c["key"]:
r.hset(
name=router["hostname"],
key=c["key"],
value=json.dumps(c["parser"](o)))
commands_proc.join()
threading_logger.debug("... got commands result & joined: %r" % router)
threading_logger.debug("waiting for snmp ifc results: %r" % router)
r.hset(
name=router["hostname"],
key="snmp-interfaces",
value=json.dumps(snmpifc_proc_queue.get()))
snmpifc_proc.join()
threading_logger.debug("... got snmp ifc result & joined: %r" % router)
threading_logger.debug("[<<EXIT]get_router_details: %r" % router)
def update_network_details(params): def update_network_details(params):
task_logger = logging.getLogger(TASK_LOGGER_NAME)
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
processes = []
for r in params["routers"]: for r in params["routers"]:
p = Process(target=get_router_details, args=(r, params)) task_logger.info("fetching router details for: %r" % r)
p.start() get_router_details(r)
processes.append({"router": r, "process": p})
result = {}
for p in processes:
threading_logger.debug(
"waiting for get_router_details result: %r" % p["router"])
p["process"].join()
threading_logger.debug(
"got result and joined get_router_details proc: %r" % p["router"])
return result
def load_network_details(redis_params): def load_network_details(redis_params):
...@@ -111,3 +56,15 @@ def load_network_details(redis_params): ...@@ -111,3 +56,15 @@ def load_network_details(redis_params):
result[hostname.decode("utf-8")] = host result[hostname.decode("utf-8")] = host
return result return result
if __name__ == "__main__":
from inventory_provider import config
with open("config.json") as f:
params = config.load(f)
# update_network_details(params)
network_info = load_network_details(params["redis"])
with open("./router-info.json", "w") as f:
f.write(json.dumps(network_info))
...@@ -68,12 +68,12 @@ def walk(agent_hostname, community, base_oid): ...@@ -68,12 +68,12 @@ def walk(agent_hostname, community, base_oid):
yield {"oid": "." + str(oid), "value": val.prettyPrint()} yield {"oid": "." + str(oid), "value": val.prettyPrint()}
def get_router_interfaces(router, config): def get_router_interfaces(hostname, community, config):
oid_map = config["oids"] oid_map = config["oids"]
details = {} details = {}
for name, oid in oid_map.items(): for name, oid in oid_map.items():
details[name] = walk(router["hostname"], router["community"], oid) details[name] = walk(hostname, community, oid)
details[name] = list(details[name]) details[name] = list(details[name])
v4IfcNames = {} v4IfcNames = {}
......
from celery import Celery
app = Celery("app")
app.config_from_object("inventory_provider.tasks.config")
from os import getenv
broker_url = getenv(
'CELERY_BROKER_URL',
default='redis://test-dashboard02.geant.org:6379/1')
import json
from celery import bootsteps, Task
import redis
from inventory_provider.tasks.app import app
from inventory_provider import config
from inventory_provider import juniper, snmp
class InventoryTask(Task):
config = None
def __init__(self):
pass
@staticmethod
def save_key(hostname, key, data):
r = redis.StrictRedis(
host=InventoryTask.config["redis"]["hostname"],
port=InventoryTask.config["redis"]["port"])
r.hset(
name=hostname,
key=key,
value=json.dumps(data))
return "OK"
class WorkerArgs(bootsteps.Step):
def __init__(self, worker, config_filename, **options):
with open(config_filename) as f:
InventoryTask.config = config.load(f)
def worker_args(parser):
parser.add_argument(
"--config_filename",
dest="config_filename",
action='store',
type=str,
help="Configuration filename")
app.user_options['worker'].add(worker_args)
app.steps['worker'].add(WorkerArgs)
@app.task(bind=InventoryTask)
def juniper_refresh_bgp(self, hostname):
InventoryTask.save_key(
hostname,
"bgp",
juniper.fetch_bgp_config(hostname, InventoryTask.config["ssh"]))
@app.task(bind=InventoryTask)
def juniper_refresh_vrr(self, hostname):
InventoryTask.save_key(
hostname,
"vrr",
juniper.fetch_vrr_config(hostname, InventoryTask.config["ssh"]))
@app.task(bind=InventoryTask)
def juniper_refresh_interfaces(self, hostname):
InventoryTask.save_key(
hostname,
"interfaces",
juniper.fetch_interfaces(hostname, InventoryTask.config["ssh"]))
@app.task(bind=InventoryTask)
def snmp_refresh_interfaces(self, hostname, community):
InventoryTask.save_key(
hostname,
"interfaces",
list(snmp.get_router_interfaces(
hostname,
community,
InventoryTask.config)))
...@@ -5,6 +5,7 @@ jsonschema ...@@ -5,6 +5,7 @@ jsonschema
paramiko paramiko
flask flask
redis redis
celery
pytest pytest
pytest-mock pytest-mock
...@@ -16,6 +16,7 @@ setup( ...@@ -16,6 +16,7 @@ setup(
'jsonschema', 'jsonschema',
'paramiko', 'paramiko',
'flask', 'flask',
'redis' 'redis',
'celery'
] ]
) )
...@@ -119,16 +119,24 @@ def router_output(request): ...@@ -119,16 +119,24 @@ def router_output(request):
return output return output
def test_ipv4_neighbors(router_output): def test_ipv4_neighbors(mocker, router_output):
old_v4_data = dict([ old_v4_data = dict([
(x["neighbor"], x) (x["neighbor"], x)
for x in for x in
_parsed_old_style_output_data(router_output["bgpv4"])]) _parsed_old_style_output_data(router_output["bgpv4"])])
parsers = dict([(c["key"], c["parser"]) for c in juniper.shell_commands()]) def _mocked_ssh_exec_commands(hostname, params, commands):
neighbors = parsers["bgp"]( assert len(commands) == 2 # the expected number
router_output["bgp"], return [None, router_output["bgp"]]
mocker.patch(
'inventory_provider.juniper.ssh_exec_commands',
_mocked_ssh_exec_commands)
neighbors = juniper.fetch_bgp_config(
None,
None,
group_expression=r'^GEANT-IX[\s-].*$') group_expression=r'^GEANT-IX[\s-].*$')
assert len(neighbors) == len(old_v4_data) assert len(neighbors) == len(old_v4_data)
...@@ -138,16 +146,24 @@ def test_ipv4_neighbors(router_output): ...@@ -138,16 +146,24 @@ def test_ipv4_neighbors(router_output):
assert old_v4_data[address]["description"] == description assert old_v4_data[address]["description"] == description
def test_ipv6_neighbors(router_output): def test_ipv6_neighbors(mocker, router_output):
old_v6_data = dict([ old_v6_data = dict([
(x["neighbor"], x) (x["neighbor"], x)
for x in for x in
_parsed_old_style_output_data(router_output["bgpv6"])]) _parsed_old_style_output_data(router_output["bgpv6"])])
parsers = dict([(c["key"], c["parser"]) for c in juniper.shell_commands()]) def _mocked_ssh_exec_commands(hostname, params, commands):
neighbors = parsers["bgp"]( assert len(commands) == 2 # the expected number
router_output["bgp"], return [None, router_output["bgp"]]
mocker.patch(
'inventory_provider.juniper.ssh_exec_commands',
_mocked_ssh_exec_commands)
neighbors = juniper.fetch_bgp_config(
None,
None,
group_expression=r'^GEANT-IXv6[\s-].*$') group_expression=r'^GEANT-IXv6[\s-].*$')
assert len(neighbors) == len(old_v6_data) assert len(neighbors) == len(old_v6_data)
...@@ -157,7 +173,14 @@ def test_ipv6_neighbors(router_output): ...@@ -157,7 +173,14 @@ def test_ipv6_neighbors(router_output):
assert old_v6_data[address]["description"] == description assert old_v6_data[address]["description"] == description
def test_juniper_shell_output_parsing(router_output): COMMAND_HANDLERS = {
'bgp': juniper.fetch_bgp_config,
'vrr': juniper.fetch_vrr_config,
'interfaces': juniper.fetch_interfaces
}
def test_juniper_shell_output_parsing(mocker, router_output):
""" """
just call the correct parser for each type of shell output just call the correct parser for each type of shell output
(not a proper test ... just verifies there's no crash) (not a proper test ... just verifies there's no crash)
...@@ -166,7 +189,17 @@ def test_juniper_shell_output_parsing(router_output): ...@@ -166,7 +189,17 @@ def test_juniper_shell_output_parsing(router_output):
:param router_output: :param router_output:
:return: :return:
""" """
for c in juniper.shell_commands():
if c["key"] is None: output = None
continue
c["parser"](router_output[c["key"]]) def _mocked_ssh_exec_commands(hostname, params, commands):
assert len(commands) == 2 # the expected number
return [None, output]
mocker.patch(
'inventory_provider.juniper.ssh_exec_commands',
_mocked_ssh_exec_commands)
for key in ["bgp", "vrr", "interfaces"]:
output = router_output[key]
COMMAND_HANDLERS[key](None, None)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment