config.py 4.44 KiB
import contextlib
import json
import re
import click
import mysql.connector
from pysnmp.hlapi import nextCmd, SnmpEngine, CommunityData, \
UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
def walk(agent_hostname, community, base_oid):
"""
https://stackoverflow.com/a/45001921
http://snmplabs.com/pysnmp/docs/hlapi/asyncore/sync/manager/cmdgen/nextcmd.html
http://snmplabs.com/pysnmp/faq/pass-custom-mib-to-manager.html
https://github.com/etingof/pysnmp/blob/master/examples/v3arch/asyncore/manager/cmdgen/getnext-multiple-oids-and-resolve-with-mib.py
http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html
:param agent_hostname:
:param community:
:param base_oid:
:return:
"""
from pysnmp.smi import builder, view, compiler, rfc1902
mibBuilder = builder.MibBuilder()
mibViewController = view.MibViewController(mibBuilder)
compiler.addMibCompiler(mibBuilder, sources=['http://mibs.snmplabs.com/asn1/@mib@'])
# Pre-load MIB modules we expect to work with
mibBuilder.loadModules('SNMPv2-MIB', 'SNMP-COMMUNITY-MIB', 'RFC1213-MIB')
print("walking %s: %s" % (agent_hostname, base_oid))
for (engineErrorIndication,
pduErrorIndication,
errorIndex,
varBinds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((agent_hostname, 161)),
ContextData(),
ObjectType(ObjectIdentity(base_oid)),
lexicographicMode=False,
lookupNames=True,
lookupValues=True):
assert not engineErrorIndication
assert not pduErrorIndication
assert errorIndex == 0
varBinds = [
rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1])
.resolveWithMib(mibViewController)
for x in varBinds]
for oid, val in varBinds:
print("\toid: '%s', val: '%s'" % (
oid.prettyPrint(), val.prettyPrint()))
def _validate_config(ctx, param, value):
"""
loads, validates and returns configuration parameters
:param ctx:
:param param:
:param value:
:return:
"""
return json.loads(value.read())
def load_oids(config_file):
"""
:param config_file: file-like object
:return:
"""
result = {}
for line in config_file:
m = re.match(r'^([^=]+)=(.*)\s*$', line)
if m:
result[m.group(1)] = m.group(2)
return result
def load_routers(config_file):
"""
:param config_file: file-like object
:return:
"""
for line in config_file:
m = re.match(r'^([a-z\d]+\.[a-z\d]{3,4}\.[a-z\d]{2}\.(geant|eumedconnect)\d*\.net)\s*=([^,]+)\s*,(.*)\s*$', line)
if not m:
print("malformed config file line: '%s'" + line.strip())
continue
yield {
"hostname": m.group(1),
"community": m.group(3),
"address": m.group(4)
}
@contextlib.contextmanager
def connection(alarmsdb):
cx = None
try:
cx = mysql.connector.connect(
host=alarmsdb["hostname"],
user=alarmsdb["username"],
passwd=alarmsdb["password"],
db=alarmsdb["dbname"])
print(cx)
yield cx
finally:
if cx:
cx.close()
@contextlib.contextmanager
def cursor(cnx):
csr = None
try:
csr = cnx.cursor()
yield csr
finally:
if csr:
csr.close()
def _db_test(db, router):
with cursor(db) as crs:
print(router)
query = "SELECT absid FROM routers WHERE hostname = %s"
crs.execute(query, (router['hostname'],))
for (absid,) in crs:
print(absid)
@click.command()
@click.option(
"--config",
# required=True,
type=click.File(),
help="Configuration filename",
default=open("config.json"),
callback=_validate_config)
def cli(config):
with connection(config["alarms-db"]) as c:
with open("routers_community.conf") as f:
for r in load_routers(f):
_db_test(c, r)
# walk(r["address"], r["community"], ".1.3.6.1.2.1.4.20.1.1")
walk(r["hostname"], r["community"], ".1.3.6.1.2.1.4.20.1.1")
break
if __name__ == "__main__":
cli()
# with open("oid_list.conf") as f:
# print(load_oids(f))
# with open("routers_community.conf") as f:
# for r in load_routers(f):
# _db_test(r)