Skip to content
Snippets Groups Projects
config.py 4.44 KiB
import contextlib
import json
import re

import click
import mysql.connector
from pysnmp.hlapi import nextCmd, SnmpEngine, CommunityData, \
        UdpTransportTarget, ContextData, ObjectType, ObjectIdentity


def walk(agent_hostname, community, base_oid):
    """
    https://stackoverflow.com/a/45001921
    http://snmplabs.com/pysnmp/docs/hlapi/asyncore/sync/manager/cmdgen/nextcmd.html

    http://snmplabs.com/pysnmp/faq/pass-custom-mib-to-manager.html

    https://github.com/etingof/pysnmp/blob/master/examples/v3arch/asyncore/manager/cmdgen/getnext-multiple-oids-and-resolve-with-mib.py


    http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html

    :param agent_hostname:
    :param community:
    :param base_oid:
    :return:
    """

    from pysnmp.smi import builder, view, compiler, rfc1902
    mibBuilder = builder.MibBuilder()
    mibViewController = view.MibViewController(mibBuilder)
    compiler.addMibCompiler(mibBuilder, sources=['http://mibs.snmplabs.com/asn1/@mib@'])
    # Pre-load MIB modules we expect to work with
    mibBuilder.loadModules('SNMPv2-MIB', 'SNMP-COMMUNITY-MIB', 'RFC1213-MIB')


    print("walking %s: %s" % (agent_hostname, base_oid))
    for (engineErrorIndication,
         pduErrorIndication,
         errorIndex,
         varBinds) in nextCmd(
            SnmpEngine(),
            CommunityData(community),
            UdpTransportTarget((agent_hostname, 161)),
            ContextData(),
            ObjectType(ObjectIdentity(base_oid)),
            lexicographicMode=False,
            lookupNames=True,
            lookupValues=True):
        assert not engineErrorIndication
        assert not pduErrorIndication
        assert errorIndex == 0
        varBinds = [
            rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1])
                .resolveWithMib(mibViewController)
            for x in varBinds]
        for oid, val in varBinds:
            print("\toid: '%s', val: '%s'" % (
                oid.prettyPrint(), val.prettyPrint()))

def _validate_config(ctx, param, value):
    """
    loads, validates and returns configuration parameters

    :param ctx:
    :param param:
    :param value:
    :return:
    """
    return json.loads(value.read())


def load_oids(config_file):
    """
    :param config_file: file-like object
    :return:
    """
    result = {}
    for line in config_file:
        m = re.match(r'^([^=]+)=(.*)\s*$', line)
        if m:
            result[m.group(1)] = m.group(2)
    return result


def load_routers(config_file):
    """
    :param config_file: file-like object
    :return:
    """
    for line in config_file:
        m = re.match(r'^([a-z\d]+\.[a-z\d]{3,4}\.[a-z\d]{2}\.(geant|eumedconnect)\d*\.net)\s*=([^,]+)\s*,(.*)\s*$', line)
        if not m:
            print("malformed config file line: '%s'" + line.strip())
            continue
        yield {
            "hostname": m.group(1),
            "community": m.group(3),
            "address": m.group(4)
        }

@contextlib.contextmanager
def connection(alarmsdb):
    cx = None
    try:
        cx = mysql.connector.connect(
            host=alarmsdb["hostname"],
            user=alarmsdb["username"],
            passwd=alarmsdb["password"],
            db=alarmsdb["dbname"])
        print(cx)
        yield cx
    finally:
        if cx:
            cx.close()


@contextlib.contextmanager
def cursor(cnx):
    csr = None
    try:
        csr = cnx.cursor()
        yield csr
    finally:
        if csr:
            csr.close()


def _db_test(db, router):
    with cursor(db) as crs:
        print(router)
        query = "SELECT absid FROM routers WHERE hostname = %s"
        crs.execute(query, (router['hostname'],))
        for (absid,) in crs:
            print(absid)


@click.command()
@click.option(
    "--config",
#    required=True,
    type=click.File(),
    help="Configuration filename",
    default=open("config.json"),
    callback=_validate_config)
def cli(config):
    with connection(config["alarms-db"]) as c:
        with open("routers_community.conf") as f:
            for r in load_routers(f):
                _db_test(c, r)
                # walk(r["address"], r["community"], ".1.3.6.1.2.1.4.20.1.1")
                walk(r["hostname"], r["community"], ".1.3.6.1.2.1.4.20.1.1")
                break


if __name__ == "__main__":
    cli()
    # with open("oid_list.conf") as f:
    #     print(load_oids(f))
    # with open("routers_community.conf") as f:
    #     for r in load_routers(f):
    #         _db_test(r)