Skip to content
Snippets Groups Projects
Commit 71f23668 authored by Release Webservice's avatar Release Webservice
Browse files

Finished release 0.2.

parents e3330321 8684e516
No related branches found
No related tags found
No related merge requests found
0.1: initial skeleton
0.2: use celery for task management
\ No newline at end of file
"""
default app creation
"""
import logging
import sys
import inventory_provider
from inventory_provider import constants
logging.basicConfig(level=logging.WARNING)
logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.TASK_LOGGER_NAME).setLevel(logging.INFO)
logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel(logging.DEBUG)
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG)
app = inventory_provider.create_app()
if __name__ == "__main__":
app.run(host="::", port="7777")
SNMP_LOGGER_NAME = "snmp-logger"
THREADING_LOGGER_NAME = "threading-logger"
JUNIPER_LOGGER_NAME = "juniper-logger"
DATABASE_LOGGER_NAME = "database-logger"
TASK_LOGGER_NAME = "task-logger"
......@@ -48,5 +48,5 @@ def routers():
host=redis_config["hostname"],
port=redis_config["port"])
return Response(
json.dumps(list(r.keys("*"))),
json.dumps(list([k.decode("utf-8") for k in r.keys("*")])),
mimetype="application/json")
......@@ -80,7 +80,51 @@ def ssh_exec_commands(hostname, ssh_params, commands):
yield stdout.read().decode("utf-8")
def shell_commands():
def _loads(s, **args):
"""
the json text contains raw backslashes
:param s:
:param args:
:return:
"""
return json.loads(s.replace("\\", "\\\\"), **args)
_DISABLE_PAGING_COMMAND = r'set cli screen-length 0'
def fetch_bgp_config(hostname, ssh_params, **args):
commands = [
_DISABLE_PAGING_COMMAND,
('show configuration routing-instances'
' IAS protocols bgp | display json')
]
output = list(ssh_exec_commands(hostname, ssh_params, commands))
assert len(output) == len(commands)
if output[1]:
return list(neighbors(_loads(output[1]), **args))
else:
return {}
def fetch_vrr_config(hostname, ssh_params):
commands = [
_DISABLE_PAGING_COMMAND,
('show configuration logical-systems '
'VRR protocols bgp | display json')
]
output = list(ssh_exec_commands(hostname, ssh_params, commands))
assert len(output) == len(commands)
return _loads(output[1]) if output[1] else {}
def fetch_interfaces(hostname, ssh_params):
def _dups_to_list(pairs):
counter_map = {}
......@@ -94,47 +138,14 @@ def shell_commands():
result[k] = v
return result
def _loads(s, **args):
"""
the json text contains raw backslashes
:param s:
:param args:
:return:
"""
return json.loads(s.replace("\\", "\\\\"), **args)
yield {
"command": "set cli screen-length 0",
"key": None,
"parser": lambda _: None
}
def _parse_bgp_output(txt, **args):
if txt:
return list(neighbors(_loads(txt), **args))
else:
return {}
yield {
"command": ('show configuration routing-instances'
' IAS protocols bgp | display json'),
"key": "bgp",
"parser": _parse_bgp_output
}
yield {
"command": ('show configuration logical-systems '
'VRR protocols bgp | display json'),
"key": "vrr",
"parser": lambda txt: _loads(txt) if txt else {}
}
yield {
"command": 'show interfaces descriptions | display json',
"key": "interfaces",
"parser": lambda txt: list(interfaces(
_loads(
txt,
object_pairs_hook=_dups_to_list)
)) if txt else {}
}
commands = [
_DISABLE_PAGING_COMMAND,
'show interfaces descriptions | display json'
]
output = list(ssh_exec_commands(hostname, ssh_params, commands))
assert len(output) == len(commands)
return _loads(
output[1],
object_pairs_hook=_dups_to_list) if output[1] else {}
import json
import logging
from multiprocessing import Process, Queue
import redis
from inventory_provider import constants
from inventory_provider import snmp
from inventory_provider import juniper
def get_router_interfaces_q(router, params, q):
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
threading_logger.debug("[ENTER>>] get_router_interfaces_q: %r" % router)
q.put(list(snmp.get_router_interfaces(router, params)))
threading_logger.debug("[<<EXIT] get_router_interfaces_q: %r" % router)
def ssh_exec_commands_q(hostname, ssh_params, commands, q):
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
threading_logger.debug("[ENTER>>] exec_router_commands_q: %r" % hostname)
q.put(list(juniper.ssh_exec_commands(hostname, ssh_params, commands)))
threading_logger.debug("[<<EXIT] exec_router_commands_q: %r" % hostname)
def get_router_details(router, params):
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
threading_logger.debug("[ENTER>>]get_router_details: %r" % router)
commands = list(juniper.shell_commands())
snmpifc_proc_queue = Queue()
snmpifc_proc = Process(
target=get_router_interfaces_q,
args=(router, params, snmpifc_proc_queue))
snmpifc_proc.start()
commands_proc_queue = Queue()
commands_proc = Process(
target=ssh_exec_commands_q,
args=(
router["hostname"],
params["ssh"],
[c["command"] for c in commands],
commands_proc_queue))
commands_proc.start()
threading_logger.debug("waiting for commands result: %r" % router)
command_output = commands_proc_queue.get()
assert len(command_output) == len(commands)
r = redis.StrictRedis(
host=params["redis"]["hostname"],
port=params["redis"]["port"])
for c, o in zip(commands, command_output):
if c["key"]:
r.hset(
name=router["hostname"],
key=c["key"],
value=json.dumps(c["parser"](o)))
commands_proc.join()
threading_logger.debug("... got commands result & joined: %r" % router)
threading_logger.debug("waiting for snmp ifc results: %r" % router)
r.hset(
name=router["hostname"],
key="snmp-interfaces",
value=json.dumps(snmpifc_proc_queue.get()))
snmpifc_proc.join()
threading_logger.debug("... got snmp ifc result & joined: %r" % router)
threading_logger.debug("[<<EXIT]get_router_details: %r" % router)
from inventory_provider.tasks.app import app
from inventory_provider.constants import TASK_LOGGER_NAME
def get_router_details(router):
task_logger = logging.getLogger(TASK_LOGGER_NAME)
task_logger.debug("launching task: "
"inventory_provider.tasks.worker.juniper_refresh_bgp")
app.send_task(
'inventory_provider.tasks.worker.juniper_refresh_bgp',
args=[router["hostname"]])
task_logger.debug("launching task: "
"inventory_provider.tasks.worker.juniper_refresh_vrr")
app.send_task(
'inventory_provider.tasks.worker.juniper_refresh_vrr',
args=[router["hostname"]])
task_logger.debug("launching task: "
"inventory_provider"
".tasks.worker.juniper_refresh_interfaces")
app.send_task(
'inventory_provider.tasks.worker.juniper_refresh_interfaces',
args=[router["hostname"]])
task_logger.debug("launching task: "
"inventory_provider"
".tasks.worker.snmp_refresh_interfaces")
app.send_task(
'inventory_provider.tasks.worker.snmp_refresh_interfaces',
args=[router["hostname"], router["community"]])
def update_network_details(params):
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
processes = []
task_logger = logging.getLogger(TASK_LOGGER_NAME)
for r in params["routers"]:
p = Process(target=get_router_details, args=(r, params))
p.start()
processes.append({"router": r, "process": p})
result = {}
for p in processes:
threading_logger.debug(
"waiting for get_router_details result: %r" % p["router"])
p["process"].join()
threading_logger.debug(
"got result and joined get_router_details proc: %r" % p["router"])
return result
task_logger.info("fetching router details for: %r" % r)
get_router_details(r)
def load_network_details(redis_params):
......@@ -111,3 +56,15 @@ def load_network_details(redis_params):
result[hostname.decode("utf-8")] = host
return result
if __name__ == "__main__":
from inventory_provider import config
with open("config.json") as f:
params = config.load(f)
# update_network_details(params)
network_info = load_network_details(params["redis"])
with open("./router-info.json", "w") as f:
f.write(json.dumps(network_info))
......@@ -68,12 +68,12 @@ def walk(agent_hostname, community, base_oid):
yield {"oid": "." + str(oid), "value": val.prettyPrint()}
def get_router_interfaces(router, config):
def get_router_interfaces(hostname, community, config):
oid_map = config["oids"]
details = {}
for name, oid in oid_map.items():
details[name] = walk(router["hostname"], router["community"], oid)
details[name] = walk(hostname, community, oid)
details[name] = list(details[name])
v4IfcNames = {}
......
from celery import Celery
app = Celery("app")
app.config_from_object("inventory_provider.tasks.config")
from os import getenv
broker_url = getenv(
'CELERY_BROKER_URL',
default='redis://test-dashboard02.geant.org:6379/1')
import json
from celery import bootsteps, Task
import redis
from inventory_provider.tasks.app import app
from inventory_provider import config
from inventory_provider import juniper, snmp
class InventoryTask(Task):
config = None
def __init__(self):
pass
@staticmethod
def save_key(hostname, key, data):
r = redis.StrictRedis(
host=InventoryTask.config["redis"]["hostname"],
port=InventoryTask.config["redis"]["port"])
r.hset(
name=hostname,
key=key,
value=json.dumps(data))
return "OK"
class WorkerArgs(bootsteps.Step):
def __init__(self, worker, config_filename, **options):
with open(config_filename) as f:
InventoryTask.config = config.load(f)
def worker_args(parser):
parser.add_argument(
"--config_filename",
dest="config_filename",
action='store',
type=str,
help="Configuration filename")
app.user_options['worker'].add(worker_args)
app.steps['worker'].add(WorkerArgs)
@app.task(bind=InventoryTask)
def juniper_refresh_bgp(self, hostname):
InventoryTask.save_key(
hostname,
"bgp",
juniper.fetch_bgp_config(hostname, InventoryTask.config["ssh"]))
@app.task(bind=InventoryTask)
def juniper_refresh_vrr(self, hostname):
InventoryTask.save_key(
hostname,
"vrr",
juniper.fetch_vrr_config(hostname, InventoryTask.config["ssh"]))
@app.task(bind=InventoryTask)
def juniper_refresh_interfaces(self, hostname):
InventoryTask.save_key(
hostname,
"interfaces",
juniper.fetch_interfaces(hostname, InventoryTask.config["ssh"]))
@app.task(bind=InventoryTask)
def snmp_refresh_interfaces(self, hostname, community):
InventoryTask.save_key(
hostname,
"interfaces",
list(snmp.get_router_interfaces(
hostname,
community,
InventoryTask.config)))
......@@ -5,6 +5,7 @@ jsonschema
paramiko
flask
redis
celery
pytest
pytest-mock
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='inventory-provider',
version="0.1",
version="0.2",
author='GEANT',
author_email='swd@geant.org',
description='Dashboard inventory provider',
......@@ -16,6 +16,7 @@ setup(
'jsonschema',
'paramiko',
'flask',
'redis'
'redis',
'celery'
]
)
......@@ -184,7 +184,7 @@ class MockedRedis(object):
MockedRedis.db[key] = value
def keys(self, *args, **kwargs):
return MockedRedis.db.keys()
return list([k.encode("utf-8") for k in MockedRedis.db.keys()])
def test_routers_list(mocker, client):
......
......@@ -119,16 +119,24 @@ def router_output(request):
return output
def test_ipv4_neighbors(router_output):
def test_ipv4_neighbors(mocker, router_output):
old_v4_data = dict([
(x["neighbor"], x)
for x in
_parsed_old_style_output_data(router_output["bgpv4"])])
parsers = dict([(c["key"], c["parser"]) for c in juniper.shell_commands()])
neighbors = parsers["bgp"](
router_output["bgp"],
def _mocked_ssh_exec_commands(hostname, params, commands):
assert len(commands) == 2 # the expected number
return [None, router_output["bgp"]]
mocker.patch(
'inventory_provider.juniper.ssh_exec_commands',
_mocked_ssh_exec_commands)
neighbors = juniper.fetch_bgp_config(
None,
None,
group_expression=r'^GEANT-IX[\s-].*$')
assert len(neighbors) == len(old_v4_data)
......@@ -138,16 +146,24 @@ def test_ipv4_neighbors(router_output):
assert old_v4_data[address]["description"] == description
def test_ipv6_neighbors(router_output):
def test_ipv6_neighbors(mocker, router_output):
old_v6_data = dict([
(x["neighbor"], x)
for x in
_parsed_old_style_output_data(router_output["bgpv6"])])
parsers = dict([(c["key"], c["parser"]) for c in juniper.shell_commands()])
neighbors = parsers["bgp"](
router_output["bgp"],
def _mocked_ssh_exec_commands(hostname, params, commands):
assert len(commands) == 2 # the expected number
return [None, router_output["bgp"]]
mocker.patch(
'inventory_provider.juniper.ssh_exec_commands',
_mocked_ssh_exec_commands)
neighbors = juniper.fetch_bgp_config(
None,
None,
group_expression=r'^GEANT-IXv6[\s-].*$')
assert len(neighbors) == len(old_v6_data)
......@@ -157,7 +173,14 @@ def test_ipv6_neighbors(router_output):
assert old_v6_data[address]["description"] == description
def test_juniper_shell_output_parsing(router_output):
COMMAND_HANDLERS = {
'bgp': juniper.fetch_bgp_config,
'vrr': juniper.fetch_vrr_config,
'interfaces': juniper.fetch_interfaces
}
def test_juniper_shell_output_parsing(mocker, router_output):
"""
just call the correct parser for each type of shell output
(not a proper test ... just verifies there's no crash)
......@@ -166,7 +189,17 @@ def test_juniper_shell_output_parsing(router_output):
:param router_output:
:return:
"""
for c in juniper.shell_commands():
if c["key"] is None:
continue
c["parser"](router_output[c["key"]])
output = None
def _mocked_ssh_exec_commands(hostname, params, commands):
assert len(commands) == 2 # the expected number
return [None, output]
mocker.patch(
'inventory_provider.juniper.ssh_exec_commands',
_mocked_ssh_exec_commands)
for key in ["bgp", "vrr", "interfaces"]:
output = router_output[key]
COMMAND_HANDLERS[key](None, None)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment