snmp.py 7.42 KiB
import ipaddress
import logging
import re
import struct
from pysnmp.hlapi import nextCmd, SnmpEngine, CommunityData, \
UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
from pysnmp.smi import builder, compiler
from pysnmp.error import PySnmpError
# from pysnmp.smi import view, rfc1902
RFC1213_MIB_IFDESC = '1.3.6.1.2.1.2.2.1.2'
# BGP4-V2-MIB-JUNIPER::jnxBgpM2PeerState
JNX_BGP_M2_PEER_STATE = '1.3.6.1.4.1.2636.5.1.1.2.1.1.1.2'
logger = logging.getLogger(__name__)
class SNMPWalkError(ConnectionError):
pass
def _cast_snmp_value(value):
"""
Cast things to the simplest native type.
:param value:
:return:
"""
try:
return int(value)
except (ValueError, TypeError):
try:
return float(value)
except (ValueError, TypeError):
try:
return str(value)
except (ValueError, TypeError):
pass
return value
def _v6address_oid2str(dotted_decimal):
hex_params = []
for dec in re.split(r'\.', dotted_decimal):
hex_params.append("%02x" % int(dec))
return ":".join(hex_params)
def _canonify_oid(oid):
"""
Our output oid's always begin with '.'.
:param oid: a thing that stringifies to a dotted oid
:return: a string like '.#.#.#...#'
"""
oid = str(oid)
return oid if oid.startswith('.') else f'.{oid}'
def walk(agent_hostname, community, base_oid): # pragma: no cover
"""
https://stackoverflow.com/a/45001921
http://snmplabs.com/pysnmp/docs/hlapi/asyncore/sync/manager/cmdgen/nextcmd.html
http://snmplabs.com/pysnmp/faq/pass-custom-mib-to-manager.html
https://github.com/etingof/pysnmp/blob/master/examples/v3arch/asyncore/manager/cmdgen/getnext-multiple-oids-and-resolve-with-mib.py
http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html
:param agent_hostname:
:param community:
:param base_oid:
:return:
"""
mibBuilder = builder.MibBuilder()
# mibViewController = view.MibViewController(mibBuilder)
compiler.addMibCompiler(
mibBuilder,
sources=['http://mibs.snmplabs.com/asn1/@mib@'])
# Pre-load MIB modules we expect to work with
mibBuilder.loadModules(
'SNMPv2-MIB',
'SNMP-COMMUNITY-MIB',
'RFC1213-MIB')
logger.debug("walking %s: %s" % (agent_hostname, base_oid))
try:
for (engineErrorIndication,
pduErrorIndication,
errorIndex,
varBinds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((agent_hostname, 161)),
ContextData(),
ObjectType(ObjectIdentity(base_oid)),
lexicographicMode=False,
lookupNames=True,
lookupValues=True):
# cf. http://snmplabs.com/
# pysnmp/examples/hlapi/asyncore/sync/contents.html
if engineErrorIndication:
raise SNMPWalkError(
f'snmp response engine error indication: '
f'{str(engineErrorIndication)} - {agent_hostname}')
if pduErrorIndication:
raise SNMPWalkError(
'snmp response pdu error %r at %r' % (
pduErrorIndication,
errorIndex
and varBinds[int(errorIndex) - 1][0] or '?'))
if errorIndex != 0:
raise SNMPWalkError(
'sanity failure: errorIndex != 0, '
'but no error indication')
# varBinds = [
# rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1])
# .resolveWithMib(mibViewController)
# for x in varBinds]
for oid, val in varBinds:
result = {
"oid": _canonify_oid(oid),
"value": _cast_snmp_value(val)
}
logger.debug(result)
yield result
except PySnmpError as e:
raise SNMPWalkError(
f'snmp error communicating with {agent_hostname}: {e}')
def _get_router_snmp_indexes(hostname, community):
for ifc in walk(hostname, community, RFC1213_MIB_IFDESC):
m = re.match(r'.*\.(\d+)$', ifc['oid'])
assert m, f'sanity failure parsing oid: {ifc["oid"]}'
yield {
'name': ifc['value'],
'index': int(m.group(1)),
'community': community
}
def _walk_util(hostname, default_community, logical_systems, walk_handler):
"""
Run walk_handler for default_community and
:param hostname:
:param community: base community name
:param logical_systems: a list of logical-systems strings, ok if empty
:param walk_handler: a method that takes params (hostname, community)
and yields things
:return: generator yielding whatever walk_handler yields
"""
# do the default community last, in case of duplicates
communities = [
f'{ls}/default@{default_community}'
for ls in logical_systems]
communities.append(default_community)
for c in communities:
yield from walk_handler(hostname, c)
def get_router_snmp_indexes(hostname, community, logical_systems):
"""
return interface names and snmp indexes
items are structured like:
{name: str, index: str, community: str}
:param hostname:
:param community: base community name
:param logical_systems: a list of logical-systems names
:return: generator yielding dicts
"""
yield from _walk_util(
hostname, community, logical_systems, _get_router_snmp_indexes)
def _v6bytes(int_str_list):
assert len(int_str_list) == 16
return struct.pack('!16B', *map(int, int_str_list))
def _v4str(int_str_list):
assert len(int_str_list) == 4
return '.'.join(int_str_list)
def _get_peer_state_info(hostname, community):
oid_prefix = f'.{JNX_BGP_M2_PEER_STATE}.'
for ifc in walk(hostname, community, JNX_BGP_M2_PEER_STATE):
assert ifc['oid'].startswith(oid_prefix), \
f'{ifc["oid"]}: {JNX_BGP_M2_PEER_STATE}'
rest = ifc['oid'][len(oid_prefix):]
splits = rest.split('.')
splits.pop(0) # no idea what this integer is ...
if splits[0] == splits[5] == '1': # v4 should peer with v4
# ipv4
assert len(splits) == 10
local = ipaddress.ip_address(_v4str(splits[1:5]))
remote = ipaddress.ip_address(_v4str(splits[6:]))
elif splits[0] == splits[17] == '2': # v6 should peer with v6
assert len(splits) == 34
local = ipaddress.ip_address(_v6bytes(splits[1:17]))
remote = ipaddress.ip_address(_v6bytes(splits[18:]))
else:
logger.error(f'expected v4 or v6 peering, got type {splits[0]}')
assert False
yield {
'local': local.exploded,
'remote': remote.exploded,
'oid': ifc['oid'],
'community': community
}
def get_peer_state_info(hostname, community, logical_systems):
"""
return peering states from all logical systems
items are structured like:
{local: str, remote: str, oid: str, community: str}
:param hostname:
:param community: base community name
:param logical_systems: a list of logical-systems names
:return: generator yielding dicts
"""
yield from _walk_util(
hostname, community, logical_systems, _get_peer_state_info)