Skip to content
Snippets Groups Projects
Commit 5deb876b authored by Erik Reid's avatar Erik Reid
Browse files

support invalid ip address formats in request

parent f32e6405
No related branches found
No related tags found
No related merge requests found
...@@ -46,7 +46,7 @@ def get_trap_metadata(source_equipment, interface): ...@@ -46,7 +46,7 @@ def get_trap_metadata(source_equipment, interface):
return Response(result, mimetype="application/json") return Response(result, mimetype="application/json")
def ix_peering_group(ix_public_peer_info): def ix_peering_group(address, description):
""" """
TODO: this is probably the least efficient way of doing this TODO: this is probably the least efficient way of doing this
(if it's a problem, pre-compute these lists) (if it's a problem, pre-compute these lists)
...@@ -55,11 +55,7 @@ def ix_peering_group(ix_public_peer_info): ...@@ -55,11 +55,7 @@ def ix_peering_group(ix_public_peer_info):
:return: :return:
""" """
address = ix_public_peer_info['name'] protocol = type(address).__name__
protocol = type(ipaddress.ip_address(address)).__name__
description = ix_public_peer_info['description']
assert description is not None # sanity: at least empty string
keyword = description.split(' ')[0] # regex needed??? (e.g. tabs???) keyword = description.split(' ')[0] # regex needed??? (e.g. tabs???)
r = common.get_redis() r = common.get_redis()
...@@ -75,15 +71,14 @@ def ix_peering_group(ix_public_peer_info): ...@@ -75,15 +71,14 @@ def ix_peering_group(ix_public_peer_info):
yield peer['name'] yield peer['name']
def find_interfaces(address_string): def find_interfaces(address):
""" """
TODO: this is probably the least efficient way of doing this TODO: this is probably the least efficient way of doing this
(if it's a problem, pre-compute these lists) (if it's a problem, pre-compute these lists)
:param address: string representation of an address :param address: an ipaddress object
:return: :return:
""" """
address = ipaddress.ip_address(address_string)
r = common.get_redis() r = common.get_redis()
for k in r.keys('reverse_interface_addresses:*'): for k in r.keys('reverse_interface_addresses:*'):
info = r.get(k.decode('utf-8')).decode('utf-8') info = r.get(k.decode('utf-8')).decode('utf-8')
...@@ -97,6 +92,13 @@ def find_interfaces(address_string): ...@@ -97,6 +92,13 @@ def find_interfaces(address_string):
@common.require_accepts_json @common.require_accepts_json
def peer_info(address): def peer_info(address):
try:
address_obj = ipaddress.ip_address(address)
except ValueError:
return Response(
response='unable to parse %r as an ip address' % address,
status=422,
mimetype="text/html")
r = common.get_redis() r = common.get_redis()
...@@ -106,15 +108,17 @@ def peer_info(address): ...@@ -106,15 +108,17 @@ def peer_info(address):
if info: if info:
info = info.decode('utf-8') info = info.decode('utf-8')
result['ix-public-peer-info'] = json.loads(info) result['ix-public-peer-info'] = json.loads(info)
description = result['ix-public-peer-info']['description']
assert description is not None # sanity
result['ix-public-peer-group'] = list( result['ix-public-peer-group'] = list(
ix_peering_group(result['ix-public-peer-info'])) ix_peering_group(address_obj, description))
info = r.get('vpn_rr_peer:%s' % address) info = r.get('vpn_rr_peer:%s' % address)
if info: if info:
info = info.decode('utf-8') info = info.decode('utf-8')
result['vpn-rr-peer-info'] = json.loads(info) result['vpn-rr-peer-info'] = json.loads(info)
interfaces = list(find_interfaces(address)) interfaces = list(find_interfaces(address_obj))
if interfaces: if interfaces:
result['interfaces'] = interfaces result['interfaces'] = interfaces
......
...@@ -128,9 +128,15 @@ def test_peer_info( ...@@ -128,9 +128,15 @@ def test_peer_info(
assert set(response_data.keys()) == expected_response_keys assert set(response_data.keys()) == expected_response_keys
def test_peer_not_found(client_with_mocked_data): def test_peer_invalid_address(client_with_mocked_data):
rv = client_with_mocked_data.get( rv = client_with_mocked_data.get(
'/classifier/peer-info/1.2.3.4.5', '/classifier/peer-info/1.2.3.4.5',
headers=DEFAULT_REQUEST_HEADERS) headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 422
def test_peer_not_found(client_with_mocked_data):
rv = client_with_mocked_data.get(
'/classifier/peer-info/1.2.3.4',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 404 assert rv.status_code == 404
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment