Skip to content
Snippets Groups Projects
Commit 34e97e64 authored by Erik Reid's avatar Erik Reid
Browse files

initial working celery tasks

parent 6e5a0401
No related branches found
No related tags found
No related merge requests found
......@@ -80,7 +80,51 @@ def ssh_exec_commands(hostname, ssh_params, commands):
yield stdout.read().decode("utf-8")
def shell_commands():
def _loads(s, **args):
"""
the json text contains raw backslashes
:param s:
:param args:
:return:
"""
return json.loads(s.replace("\\", "\\\\"), **args)
_DISABLE_PAGING_COMMAND = r'set cli screen-length 0'
def fetch_bgp_config(hostname, ssh_params, **args):
commands = [
_DISABLE_PAGING_COMMAND,
('show configuration routing-instances'
' IAS protocols bgp | display json')
]
output = list(ssh_exec_commands(hostname, ssh_params, commands))
assert len(output) == len(commands)
if output[1]:
return list(neighbors(_loads(output[1]), **args))
else:
return {}
def fetch_vrr_config(hostname, ssh_params):
commands = [
_DISABLE_PAGING_COMMAND,
('show configuration logical-systems '
'VRR protocols bgp | display json')
]
output = list(ssh_exec_commands(hostname, ssh_params, commands))
assert len(output) == len(commands)
return _loads(output[1]) if output[1] else {}
def fetch_interfaces(hostname, ssh_params):
def _dups_to_list(pairs):
counter_map = {}
......@@ -94,47 +138,14 @@ def shell_commands():
result[k] = v
return result
def _loads(s, **args):
"""
the json text contains raw backslashes
:param s:
:param args:
:return:
"""
return json.loads(s.replace("\\", "\\\\"), **args)
yield {
"command": "set cli screen-length 0",
"key": None,
"parser": lambda _: None
}
def _parse_bgp_output(txt, **args):
if txt:
return list(neighbors(_loads(txt), **args))
else:
return {}
yield {
"command": ('show configuration routing-instances'
' IAS protocols bgp | display json'),
"key": "bgp",
"parser": _parse_bgp_output
}
yield {
"command": ('show configuration logical-systems '
'VRR protocols bgp | display json'),
"key": "vrr",
"parser": lambda txt: _loads(txt) if txt else {}
}
yield {
"command": 'show interfaces descriptions | display json',
"key": "interfaces",
"parser": lambda txt: list(interfaces(
_loads(
txt,
object_pairs_hook=_dups_to_list)
)) if txt else {}
}
commands = [
_DISABLE_PAGING_COMMAND,
'show interfaces descriptions | display json'
]
output = list(ssh_exec_commands(hostname, ssh_params, commands))
assert len(output) == len(commands)
return _loads(
output[1],
object_pairs_hook=_dups_to_list) if output[1] else {}
\ No newline at end of file
......@@ -4,9 +4,8 @@ from multiprocessing import Process, Queue
import redis
from inventory_provider import constants
from inventory_provider import constants, juniper
from inventory_provider import snmp
from inventory_provider import juniper
def get_router_interfaces_q(router, params, q):
......
from celery import Celery
from celery.bin import Option
app = Celery("app")
app.config_from_object("inventory_provider.tasks.config")
from os import getenv
broker_url = getenv(
'CELERY_BROKER_URL',
default='redis://test-dashboard02.geant.org:6379/1')
import json
from celery import bootsteps, Task
import redis
from inventory_provider.tasks.app import app
from inventory_provider import config
from inventory_provider import juniper
class InventoryTask(Task):
config = None
def __init__(self):
pass
@staticmethod
def save_key(hostname, key, data):
r = redis.StrictRedis(
host=InventoryTask.config["redis"]["hostname"],
port=InventoryTask.config["redis"]["port"])
r.hset(
name=hostname,
key=key,
value=json.dumps(data))
return "OK"
class WorkerArgs(bootsteps.Step):
def __init__(self, worker, config_filename, **options):
with open(config_filename) as f:
InventoryTask.config = config.load(f)
def worker_args(parser):
parser.add_argument(
"--config_filename",
dest="config_filename",
action='store',
type=str,
help="Configuration filename")
app.user_options['worker'].add(worker_args)
app.steps['worker'].add(WorkerArgs)
@app.task(bind=InventoryTask)
def juniper_refresh_bgp(self, hostname):
InventoryTask.save_key(
hostname,
"bgp",
juniper.fetch_bgp_config(hostname, InventoryTask.config["ssh"]))
@app.task(bind=InventoryTask)
def juniper_refresh_vrr(self, hostname):
InventoryTask.save_key(
hostname,
"vrr",
juniper.fetch_vrr_config(hostname, InventoryTask.config["ssh"]))
@app.task(bind=InventoryTask)
def juniper_refresh_interfaces(self, hostname):
InventoryTask.save_key(
hostname,
"interfaces",
juniper.fetch_interfaces(hostname, InventoryTask.config["ssh"]))
......@@ -5,6 +5,7 @@ jsonschema
paramiko
flask
redis
celery
pytest
pytest-mock
......@@ -16,6 +16,7 @@ setup(
'jsonschema',
'paramiko',
'flask',
'redis'
'redis',
'celery'
]
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment