Skip to content
Snippets Groups Projects
snmp.py 4.91 KiB
import logging
import re

from pysnmp.hlapi import nextCmd, SnmpEngine, CommunityData, \
        UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
from pysnmp.smi import builder, compiler
# from pysnmp.smi import view, rfc1902


RFC1213_MIB_IFDESC = '1.3.6.1.2.1.2.2.1.2'


def _v6address_oid2str(dotted_decimal):
    hex_params = []
    for dec in re.split(r'\.', dotted_decimal):
        hex_params.append("%02x" % int(dec))
    return ":".join(hex_params)


def walk(agent_hostname, community, base_oid):  # pragma: no cover
    """
    https://stackoverflow.com/a/45001921
    http://snmplabs.com/pysnmp/docs/hlapi/asyncore/sync/manager/cmdgen/nextcmd.html
    http://snmplabs.com/pysnmp/faq/pass-custom-mib-to-manager.html
    https://github.com/etingof/pysnmp/blob/master/examples/v3arch/asyncore/manager/cmdgen/getnext-multiple-oids-and-resolve-with-mib.py
    http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html

    :param agent_hostname:
    :param community:
    :param base_oid:
    :return:
    """

    logger = logging.getLogger(__name__)

    mibBuilder = builder.MibBuilder()
    # mibViewController = view.MibViewController(mibBuilder)
    compiler.addMibCompiler(
        mibBuilder,
        sources=['http://mibs.snmplabs.com/asn1/@mib@'])
    # Pre-load MIB modules we expect to work with
    mibBuilder.loadModules(
        'SNMPv2-MIB',
        'SNMP-COMMUNITY-MIB',
        'RFC1213-MIB')

    logger.debug("walking %s: %s" % (agent_hostname, base_oid))

    for (engineErrorIndication,
         pduErrorIndication,
         errorIndex,
         varBinds) in nextCmd(
            SnmpEngine(),
            CommunityData(community),
            UdpTransportTarget((agent_hostname, 161)),
            ContextData(),
            ObjectType(ObjectIdentity(base_oid)),
            lexicographicMode=False,
            lookupNames=True,
            lookupValues=True):

        # cf. http://snmplabs.com/
        #       pysnmp/examples/hlapi/asyncore/sync/contents.html
        assert not engineErrorIndication, (
            'snmp response engine error indication: %r'
            % str(engineErrorIndication))
        assert not pduErrorIndication, 'snmp response pdu error %r at %r' % (
            pduErrorIndication,
            errorIndex and varBinds[int(errorIndex) - 1][0] or '?')
        assert errorIndex == 0, (
            'sanity failure: errorIndex != 0, '
            'but no error indication')

        # varBinds = [
        #     rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1])
        #         .resolveWithMib(mibViewController)
        #     for x in varBinds]
        for oid, val in varBinds:
            result = {"oid": "." + str(oid), "value": val.prettyPrint()}
            logger.debug(result)
            yield result


def get_router_snmp_indexes(hostname, community):
    indexes = {}
    for ifc in walk(hostname, community, RFC1213_MIB_IFDESC):
        m = re.match(r'.*\.(\d+)$', ifc['oid'])
        assert m, 'sanity failure parsing oid: %r' % ifc['oid']
        indexes[ifc['value']] = int(m.group(1))
    return indexes

def get_router_interfaces(hostname, community, config):
    oid_map = config["oids"]

    details = {}
    for name, oid in oid_map.items():
        details[name] = walk(hostname, community, oid)
        details[name] = list(details[name])

    v4IfcNames = {}
    for v4IfcName in details["v4InterfaceName"]:
        m = re.match(r'.*\.(\d+)$', v4IfcName["oid"])
        assert m, "sanity failure parsing oid: " + v4IfcName["oid"]
        v4IfcNames[m.group(1)] = v4IfcName["value"]

    for v4Address, v4Mask, v4InterfaceOID in zip(
            details["v4Address"],
            details["v4Mask"],
            details["v4InterfaceOID"]):
        yield {
            "v4Address": v4Address["value"],
            "v4Mask": v4Mask["value"],
            "v4InterfaceName": v4IfcNames[v4InterfaceOID["value"]],
            "index": v4InterfaceOID["value"]
        }

    v6IfcNames = {}
    for v6InterfaceName in details["v6InterfaceName"]:
        m = re.match(r'.*\.(\d+)$', v6InterfaceName["oid"])
        assert m, "sanity failure parsing oid: " + v6InterfaceName["oid"]
        v6IfcNames[m.group(1)] = v6InterfaceName["value"]

    for v6AddressAndMask in details["v6AddressAndMask"]:
        pattern = (
            r'^'
            + oid_map["v6AddressAndMask"].replace(r'.', r'\.')
            + r'\.(\d+)\.(.+)$'
        )
        m = re.match(pattern, v6AddressAndMask["oid"])
        assert m, "sanity failure parsing oid: " + v6InterfaceName["oid"]
        yield {
            "v6Address": _v6address_oid2str(m.group(2)),
            "v6Mask": v6AddressAndMask["value"],
            "v6InterfaceName": v6IfcNames[m.group(1)],
            "index": m.group(1)
        }

if __name__ == "__main__":
    hostname = 'mx1.ams.nl.geant.net'
    community = '0pBiFbD'
    interfaces = get_router_snmp_indexes(hostname, community)
    for name, index in interfaces.items():
        print('%s: %s' % (name, index))