Skip to content
Snippets Groups Projects
Commit a9e42edc authored by Erik Reid's avatar Erik Reid
Browse files

use celery instead of multiprocessing module

parent cabfc5c7
No related branches found
No related tags found
No related merge requests found
......@@ -9,7 +9,7 @@ from inventory_provider import constants
logging.basicConfig(level=logging.WARNING)
logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.THREADING_LOGGER_NAME).setLevel(logging.INFO)
logging.getLogger(constants.TASK_LOGGER_NAME).setLevel(logging.INFO)
logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel(logging.DEBUG)
logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel(logging.DEBUG)
......
SNMP_LOGGER_NAME = "snmp-logger"
THREADING_LOGGER_NAME = "threading-logger"
JUNIPER_LOGGER_NAME = "juniper-logger"
DATABASE_LOGGER_NAME = "database-logger"
TASK_LOGGER_NAME = "task-logger"
\ No newline at end of file
......@@ -148,4 +148,4 @@ def fetch_interfaces(hostname, ssh_params):
return _loads(
output[1],
object_pairs_hook=_dups_to_list) if output[1] else {}
\ No newline at end of file
object_pairs_hook=_dups_to_list) if output[1] else {}
import json
import logging
from multiprocessing import Process, Queue
import redis
from inventory_provider import constants, juniper
from inventory_provider import snmp
def get_router_interfaces_q(router, params, q):
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
threading_logger.debug("[ENTER>>] get_router_interfaces_q: %r" % router)
q.put(list(snmp.get_router_interfaces(router, params)))
threading_logger.debug("[<<EXIT] get_router_interfaces_q: %r" % router)
def ssh_exec_commands_q(hostname, ssh_params, commands, q):
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
threading_logger.debug("[ENTER>>] exec_router_commands_q: %r" % hostname)
q.put(list(juniper.ssh_exec_commands(hostname, ssh_params, commands)))
threading_logger.debug("[<<EXIT] exec_router_commands_q: %r" % hostname)
def get_router_details(router, params):
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
threading_logger.debug("[ENTER>>]get_router_details: %r" % router)
commands = list(juniper.shell_commands())
snmpifc_proc_queue = Queue()
snmpifc_proc = Process(
target=get_router_interfaces_q,
args=(router, params, snmpifc_proc_queue))
snmpifc_proc.start()
commands_proc_queue = Queue()
commands_proc = Process(
target=ssh_exec_commands_q,
args=(
router["hostname"],
params["ssh"],
[c["command"] for c in commands],
commands_proc_queue))
commands_proc.start()
threading_logger.debug("waiting for commands result: %r" % router)
command_output = commands_proc_queue.get()
assert len(command_output) == len(commands)
r = redis.StrictRedis(
host=params["redis"]["hostname"],
port=params["redis"]["port"])
for c, o in zip(commands, command_output):
if c["key"]:
r.hset(
name=router["hostname"],
key=c["key"],
value=json.dumps(c["parser"](o)))
commands_proc.join()
threading_logger.debug("... got commands result & joined: %r" % router)
threading_logger.debug("waiting for snmp ifc results: %r" % router)
r.hset(
name=router["hostname"],
key="snmp-interfaces",
value=json.dumps(snmpifc_proc_queue.get()))
snmpifc_proc.join()
threading_logger.debug("... got snmp ifc result & joined: %r" % router)
threading_logger.debug("[<<EXIT]get_router_details: %r" % router)
from inventory_provider.tasks.app import app
from inventory_provider.constants import TASK_LOGGER_NAME
def get_router_details(router):
task_logger = logging.getLogger(TASK_LOGGER_NAME)
task_logger.debug("launching task: "
"inventory_provider.tasks.worker.juniper_refresh_bgp")
app.send_task(
'inventory_provider.tasks.worker.juniper_refresh_bgp',
args=[router["hostname"]])
task_logger.debug("launching task: "
"inventory_provider.tasks.worker.juniper_refresh_vrr")
app.send_task(
'inventory_provider.tasks.worker.juniper_refresh_vrr',
args=[router["hostname"]])
task_logger.debug("launching task: "
"inventory_provider"
".tasks.worker.juniper_refresh_interfaces")
app.send_task(
'inventory_provider.tasks.worker.juniper_refresh_interfaces',
args=[router["hostname"]])
task_logger.debug("launching task: "
"inventory_provider"
".tasks.worker.snmp_refresh_interfaces")
app.send_task(
'inventory_provider.tasks.worker.snmp_refresh_interfaces',
args=[router["hostname"], router["community"]])
def update_network_details(params):
threading_logger = logging.getLogger(constants.THREADING_LOGGER_NAME)
processes = []
task_logger = logging.getLogger(TASK_LOGGER_NAME)
for r in params["routers"]:
p = Process(target=get_router_details, args=(r, params))
p.start()
processes.append({"router": r, "process": p})
result = {}
for p in processes:
threading_logger.debug(
"waiting for get_router_details result: %r" % p["router"])
p["process"].join()
threading_logger.debug(
"got result and joined get_router_details proc: %r" % p["router"])
return result
task_logger.info("fetching router details for: %r" % r)
get_router_details(r)
def load_network_details(redis_params):
......@@ -110,3 +56,15 @@ def load_network_details(redis_params):
result[hostname.decode("utf-8")] = host
return result
if __name__ == "__main__":
from inventory_provider import config
with open("config.json") as f:
params = config.load(f)
# update_network_details(params)
network_info = load_network_details(params["redis"])
with open("./router-info.json", "w") as f:
f.write(json.dumps(network_info))
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment