Skip to content
Snippets Groups Projects
Commit b4eda69d authored by Erik Reid's avatar Erik Reid
Browse files

added utils for capturing bgp peer state info

parent ba4cc6a5
No related branches found
No related tags found
No related merge requests found
import ipaddress
import logging
import re
import struct
from pysnmp.hlapi import nextCmd, SnmpEngine, CommunityData, \
UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
from pysnmp.smi import builder, compiler
# from pysnmp.smi import view, rfc1902
RFC1213_MIB_IFDESC = '1.3.6.1.2.1.2.2.1.2'
# BGP4-V2-MIB-JUNIPER::jnxBgpM2PeerState
JNX_BGP_M2_PEER_STATE = '1.3.6.1.4.1.2636.5.1.1.2.1.1.1.2'
logger = logging.getLogger(__name__)
class SNMPWalkError(ConnectionError):
pass
......@@ -35,8 +40,6 @@ def walk(agent_hostname, community, base_oid): # pragma: no cover
:return:
"""
logger = logging.getLogger(__name__)
mibBuilder = builder.MibBuilder()
# mibViewController = view.MibViewController(mibBuilder)
compiler.addMibCompiler(
......@@ -54,14 +57,14 @@ def walk(agent_hostname, community, base_oid): # pragma: no cover
pduErrorIndication,
errorIndex,
varBinds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((agent_hostname, 161)),
ContextData(),
ObjectType(ObjectIdentity(base_oid)),
lexicographicMode=False,
lookupNames=True,
lookupValues=True):
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((agent_hostname, 161)),
ContextData(),
ObjectType(ObjectIdentity(base_oid)),
lexicographicMode=False,
lookupNames=True,
lookupValues=True):
# cf. http://snmplabs.com/
# pysnmp/examples/hlapi/asyncore/sync/contents.html
......@@ -97,3 +100,128 @@ def get_router_snmp_indexes(hostname, community):
'name': ifc['value'],
'index': int(m.group(1))
}
def _v6bytes(l):
assert len(l) == 16
return struct.pack('!16B', *map(int, l))
def _v4str(l):
assert len(l) == 4
return '.'.join(l)
def get_peer_state_info(hostname, community):
oid_prefix = f'.{JNX_BGP_M2_PEER_STATE}.'
for ifc in walk(hostname, community, JNX_BGP_M2_PEER_STATE):
assert ifc['oid'].startswith(oid_prefix), \
f'{ifc["oid"]}: {JNX_BGP_M2_PEER_STATE}'
rest = ifc['oid'][len(oid_prefix):]
splits = rest.split('.')
splits.pop(0) # no idea what this integer is ...
if splits[0] == splits[5] == '1': # v4 should peer with v4
# ipv4
assert len(splits) == 10
local = ipaddress.ip_address(_v4str(splits[1:5]))
remote = ipaddress.ip_address(_v4str(splits[6:]))
elif splits[0] == splits[17] == '2': # v6 should peer with v6
assert len(splits) == 34
local = ipaddress.ip_address(_v6bytes(splits[1:17]))
remote = ipaddress.ip_address(_v6bytes(splits[18:]))
else:
logger.error()
yield {
'local': local.exploded,
'remote': remote.exploded,
'oid': ifc['oid'][1:],
'value': ifc['value'],
}
############################
from pysnmp.hlapi import getCmd
def _construct_object_types(oids):
return [ObjectType(ObjectIdentity(oid)) for oid in oids]
def _cast_snmp_value(value):
"""
Boilerplate method for explicitly casting returned snmp value types.
Copied from the tutorial - negligible performance
impact since in our case we'll always return in the
first condition. This method is in fact totally useless
and should be removed.
:param value:
:return:
"""
try:
return int(value)
except (ValueError, TypeError):
try:
return float(value)
except (ValueError, TypeError):
try:
return str(value)
except (ValueError, TypeError):
pass
return value
def _fetch(handler):
"""
yields (oid, value) from the response handler
:param handler:
:return: a dict like {oid: value}, or None if there's any error
"""
for (error_indication, error_status, error_index, var_binds) in handler:
if error_indication or error_status:
# raise RuntimeError(
# 'Got SNMP error: {0}'.format(error_indication))
logger.error(f'SNMP error: {error_indication}')
return
for oid, value in var_binds:
yield [str(oid), _cast_snmp_value(value)]
def get(
router_hostname: str,
community_string: str,
oids) -> dict:
"""
Sends SNMP get requests for each oid and returns the results in a dict.
:param router_hostname:
:param community_string:
:param oids:
:return: a dict like {oid: value}, or None if there's any error
"""
handler = getCmd(
SnmpEngine(),
CommunityData(community_string),
UdpTransportTarget((router_hostname, 161)),
ContextData(),
*_construct_object_types(oids)
)
return {oid: val for oid, val in _fetch(handler)}
##############
if __name__ == '__main__':
# for x in get_peer_state_oids('mx1.kau.lt.geant.net', '0pBiFbD'):
# print(x)
oids = [x['oid'] for x in get_peer_state_info('mx1.kau.lt.geant.net', '0pBiFbD')]
oids = list(oids)
for i in range(0, len(oids), 3):
print('*' * 20)
for x in get('mx1.kau.lt.geant.net', '0pBiFbD', oids[i:i+3]).items():
print(x)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment