import logging import re from pysnmp.hlapi import nextCmd, SnmpEngine, CommunityData, \ UdpTransportTarget, ContextData, ObjectType, ObjectIdentity from pysnmp.smi import builder, compiler # from pysnmp.smi import view, rfc1902 OID_LIST = { ## IPv4 'v4Address': '.1.3.6.1.2.1.4.20.1.1', 'v4InterfaceOID': '.1.3.6.1.2.1.4.20.1.2', 'v4InterfaceName': '.1.3.6.1.2.1.31.1.1.1.1', 'v4Mask': '.1.3.6.1.2.1.4.20.1.3', ## IPv6 'v6AddressAndMask' '.1.3.6.1.2.1.55.1.8.1.2' 'v6InterfaceName': '.1.3.6.1.2.1.55.1.5.1.2' } def _v6address_oid2str(dotted_decimal): hex_params = [] for dec in re.split(r'\.', dotted_decimal): hex_params.append("%02x" % int(dec)) return ":".join(hex_params) def walk(agent_hostname, community, base_oid): # pragma: no cover """ https://stackoverflow.com/a/45001921 http://snmplabs.com/pysnmp/docs/hlapi/asyncore/sync/manager/cmdgen/nextcmd.html http://snmplabs.com/pysnmp/faq/pass-custom-mib-to-manager.html https://github.com/etingof/pysnmp/blob/master/examples/v3arch/asyncore/manager/cmdgen/getnext-multiple-oids-and-resolve-with-mib.py http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html :param agent_hostname: :param community: :param base_oid: :return: """ logger = logging.getLogger(__name__) mibBuilder = builder.MibBuilder() # mibViewController = view.MibViewController(mibBuilder) compiler.addMibCompiler( mibBuilder, sources=['http://mibs.snmplabs.com/asn1/@mib@']) # Pre-load MIB modules we expect to work with mibBuilder.loadModules( 'SNMPv2-MIB', 'SNMP-COMMUNITY-MIB', 'RFC1213-MIB') logger.debug("walking %s: %s" % (agent_hostname, base_oid)) for (engineErrorIndication, pduErrorIndication, errorIndex, varBinds) in nextCmd( SnmpEngine(), CommunityData(community), UdpTransportTarget((agent_hostname, 161)), ContextData(), ObjectType(ObjectIdentity(base_oid)), lexicographicMode=False, lookupNames=True, lookupValues=True): # cf. http://snmplabs.com/ # pysnmp/examples/hlapi/asyncore/sync/contents.html assert not engineErrorIndication, ( 'snmp response engine error indication: %r' % str(engineErrorIndication)) assert not pduErrorIndication, 'snmp response pdu error %r at %r' % ( pduErrorIndication, errorIndex and varBinds[int(errorIndex) - 1][0] or '?') assert errorIndex == 0, ( 'sanity failure: errorIndex != 0, ' 'but no error indication') # varBinds = [ # rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1]) # .resolveWithMib(mibViewController) # for x in varBinds] for oid, val in varBinds: result = {"oid": "." + str(oid), "value": val.prettyPrint()} logger.debug(result) yield result def get_router_snmp_indexes(hostname, community): indexes = {} RFC1213_MIB_ifDesc = '1.3.6.1.2.1.2.2.1.2' for ifc in walk(hostname, community, RFC1213_MIB_ifDesc): m = re.match(r'.*\.(\d+)$', ifc['oid']) assert m, 'sanity failure parsing oid: %r' % ifc['oid'] indexes[ifc['value']] = int(m.group(1)) return indexes def get_router_interfaces(hostname, community, config): oid_map = config["oids"] details = {} for name, oid in oid_map.items(): details[name] = walk(hostname, community, oid) details[name] = list(details[name]) v4IfcNames = {} for v4IfcName in details["v4InterfaceName"]: m = re.match(r'.*\.(\d+)$', v4IfcName["oid"]) assert m, "sanity failure parsing oid: " + v4IfcName["oid"] v4IfcNames[m.group(1)] = v4IfcName["value"] for v4Address, v4Mask, v4InterfaceOID in zip( details["v4Address"], details["v4Mask"], details["v4InterfaceOID"]): yield { "v4Address": v4Address["value"], "v4Mask": v4Mask["value"], "v4InterfaceName": v4IfcNames[v4InterfaceOID["value"]], "index": v4InterfaceOID["value"] } v6IfcNames = {} for v6InterfaceName in details["v6InterfaceName"]: m = re.match(r'.*\.(\d+)$', v6InterfaceName["oid"]) assert m, "sanity failure parsing oid: " + v6InterfaceName["oid"] v6IfcNames[m.group(1)] = v6InterfaceName["value"] for v6AddressAndMask in details["v6AddressAndMask"]: pattern = ( r'^' + oid_map["v6AddressAndMask"].replace(r'.', r'\.') + r'\.(\d+)\.(.+)$' ) m = re.match(pattern, v6AddressAndMask["oid"]) assert m, "sanity failure parsing oid: " + v6InterfaceName["oid"] yield { "v6Address": _v6address_oid2str(m.group(2)), "v6Mask": v6AddressAndMask["value"], "v6InterfaceName": v6IfcNames[m.group(1)], "index": m.group(1) } if __name__ == "__main__": hostname = 'mx1.ams.nl.geant.net' community = '0pBiFbD' interfaces = get_router_snmp_indexes(hostname, community) for name, index in interfaces.items(): print('%s: %s' % (name, index))