snmp.py 3.84 KiB
import logging
import re
from pysnmp.hlapi import nextCmd, SnmpEngine, CommunityData, \
UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
from pysnmp.smi import builder, compiler
# from pysnmp.smi import view, rfc1902
from inventory_provider.constants import SNMP_LOGGER_NAME
def _v6address_oid2str(dotted_decimal):
hex_params = []
for dec in re.split(r'\.', dotted_decimal):
hex_params.append("%02x" % int(dec))
return ":".join(hex_params)
def walk(agent_hostname, community, base_oid):
"""
https://stackoverflow.com/a/45001921
http://snmplabs.com/pysnmp/docs/hlapi/asyncore/sync/manager/cmdgen/nextcmd.html
http://snmplabs.com/pysnmp/faq/pass-custom-mib-to-manager.html
https://github.com/etingof/pysnmp/blob/master/examples/v3arch/asyncore/manager/cmdgen/getnext-multiple-oids-and-resolve-with-mib.py
http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html
:param agent_hostname:
:param community:
:param base_oid:
:return:
"""
snmp_logger = logging.getLogger(SNMP_LOGGER_NAME)
mibBuilder = builder.MibBuilder()
# mibViewController = view.MibViewController(mibBuilder)
compiler.addMibCompiler(
mibBuilder,
sources=['http://mibs.snmplabs.com/asn1/@mib@'])
# Pre-load MIB modules we expect to work with
mibBuilder.loadModules(
'SNMPv2-MIB',
'SNMP-COMMUNITY-MIB',
'RFC1213-MIB')
snmp_logger.debug("walking %s: %s" % (agent_hostname, base_oid))
for (engineErrorIndication,
pduErrorIndication,
errorIndex,
varBinds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((agent_hostname, 161)),
ContextData(),
ObjectType(ObjectIdentity(base_oid)),
lexicographicMode=False,
lookupNames=True,
lookupValues=True):
assert not engineErrorIndication
assert not pduErrorIndication
assert errorIndex == 0
# varBinds = [
# rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1])
# .resolveWithMib(mibViewController)
# for x in varBinds]
for oid, val in varBinds:
yield {"oid": "." + str(oid), "value": val.prettyPrint()}
def get_router_interfaces(router, config):
oid_map = config["oids"]
details = {}
for name, oid in oid_map.items():
details[name] = walk(router["hostname"], router["community"], oid)
details[name] = list(details[name])
v4IfcNames = {}
for v4IfcName in details["v4InterfaceName"]:
m = re.match(r'.*\.(\d+)$', v4IfcName["oid"])
assert m, "sanity failure parsing oid: " + v4IfcName["oid"]
v4IfcNames[m.group(1)] = v4IfcName["value"]
for v4Address, v4Mask, v4InterfaceOID in zip(
details["v4Address"],
details["v4Mask"],
details["v4InterfaceOID"]):
yield {
"v4Address": v4Address["value"],
"v4Mask": v4Mask["value"],
"v4InterfaceName": v4IfcNames[v4InterfaceOID["value"]]
}
v6IfcNames = {}
for v6InterfaceName in details["v6InterfaceName"]:
m = re.match(r'.*\.(\d+)$', v6InterfaceName["oid"])
assert m, "sanity failure parsing oid: " + v6InterfaceName["oid"]
v6IfcNames[m.group(1)] = v6InterfaceName["value"]
for v6AddressAndMask in details["v6AddressAndMask"]:
pattern = (
r'^'
+ oid_map["v6AddressAndMask"].replace(r'.', r'\.')
+ r'\.(\d+)\.(.+)$'
)
m = re.match(pattern, v6AddressAndMask["oid"])
assert m, "sanity failure parsing oid: " + v6InterfaceName["oid"]
yield {
"v6Address": _v6address_oid2str(m.group(2)),
"v6Mask": v6AddressAndMask["value"],
"v6InterfaceName": v6IfcNames[m.group(1)]
}