import contextlib import json import re import click import mysql.connector from pysnmp.hlapi import nextCmd, SnmpEngine, CommunityData, \ UdpTransportTarget, ContextData, ObjectType, ObjectIdentity def walk(agent_hostname, community, base_oid): """ https://stackoverflow.com/a/45001921 http://snmplabs.com/pysnmp/docs/hlapi/asyncore/sync/manager/cmdgen/nextcmd.html http://snmplabs.com/pysnmp/faq/pass-custom-mib-to-manager.html https://github.com/etingof/pysnmp/blob/master/examples/v3arch/asyncore/manager/cmdgen/getnext-multiple-oids-and-resolve-with-mib.py http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html :param agent_hostname: :param community: :param base_oid: :return: """ from pysnmp.smi import builder, view, compiler, rfc1902 mibBuilder = builder.MibBuilder() mibViewController = view.MibViewController(mibBuilder) compiler.addMibCompiler(mibBuilder, sources=['http://mibs.snmplabs.com/asn1/@mib@']) # Pre-load MIB modules we expect to work with mibBuilder.loadModules('SNMPv2-MIB', 'SNMP-COMMUNITY-MIB', 'RFC1213-MIB') print("walking %s: %s" % (agent_hostname, base_oid)) for (engineErrorIndication, pduErrorIndication, errorIndex, varBinds) in nextCmd( SnmpEngine(), CommunityData(community), UdpTransportTarget((agent_hostname, 161)), ContextData(), ObjectType(ObjectIdentity(base_oid)), lexicographicMode=False, lookupNames=True, lookupValues=True): assert not engineErrorIndication assert not pduErrorIndication assert errorIndex == 0 varBinds = [ rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1]) .resolveWithMib(mibViewController) for x in varBinds] for oid, val in varBinds: print("\toid: '%s', val: '%s'" % ( oid.prettyPrint(), val.prettyPrint())) def _validate_config(ctx, param, value): """ loads, validates and returns configuration parameters :param ctx: :param param: :param value: :return: """ return json.loads(value.read()) def load_oids(config_file): """ :param config_file: file-like object :return: """ result = {} for line in config_file: m = re.match(r'^([^=]+)=(.*)\s*$', line) if m: result[m.group(1)] = m.group(2) return result def load_routers(config_file): """ :param config_file: file-like object :return: """ for line in config_file: m = re.match(r'^([a-z\d]+\.[a-z\d]{3,4}\.[a-z\d]{2}\.(geant|eumedconnect)\d*\.net)\s*=([^,]+)\s*,(.*)\s*$', line) if not m: print("malformed config file line: '%s'" + line.strip()) continue yield { "hostname": m.group(1), "community": m.group(3), "address": m.group(4) } @contextlib.contextmanager def connection(alarmsdb): cx = None try: cx = mysql.connector.connect( host=alarmsdb["hostname"], user=alarmsdb["username"], passwd=alarmsdb["password"], db=alarmsdb["dbname"]) print(cx) yield cx finally: if cx: cx.close() @contextlib.contextmanager def cursor(cnx): csr = None try: csr = cnx.cursor() yield csr finally: if csr: csr.close() def _db_test(db, router): with cursor(db) as crs: print(router) query = "SELECT absid FROM routers WHERE hostname = %s" crs.execute(query, (router['hostname'],)) for (absid,) in crs: print(absid) @click.command() @click.option( "--config", # required=True, type=click.File(), help="Configuration filename", default=open("config.json"), callback=_validate_config) def cli(config): with connection(config["alarms-db"]) as c: with open("routers_community.conf") as f: for r in load_routers(f): _db_test(c, r) # walk(r["address"], r["community"], ".1.3.6.1.2.1.4.20.1.1") walk(r["hostname"], r["community"], ".1.3.6.1.2.1.4.20.1.1") break if __name__ == "__main__": cli() # with open("oid_list.conf") as f: # print(load_oids(f)) # with open("routers_community.conf") as f: # for r in load_routers(f): # _db_test(r)