def test_todo(): """ refactor before using new netconf api :return: """ pass # import os # import re # # import pytest # # from inventory_provider import juniper # # TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), "data") # # ROUTER_HOSTNAMES = [ # 'mx2.ath.gr.geant.net', # 'mx1.tal.ee.geant.net', # 'mx2.tal.ee.geant.net', # 'mx2.rig.lv.geant.net', # 'mx1.kau.lt.geant.net', # 'mx2.kau.lt.geant.net', # 'mx2.zag.hr.geant.net', # 'mx2.lju.si.geant.net', # 'mx1.bud.hu.geant.net', # 'mx1.pra.cz.geant.net', # 'mx2.bra.sk.geant.net', # 'mx1.lon.uk.geant.net', # 'mx1.vie.at.geant.net', # 'mx2.bru.be.geant.net', # 'mx1.poz.pl.geant.net', # 'mx1.ams.nl.geant.net', # 'mx1.fra.de.geant.net', # 'mx1.par.fr.geant.net', # 'mx1.gen.ch.geant.net', # 'mx1.mil2.it.geant.net', # 'mx1.lis.pt.geant.net', # 'mx2.lis.pt.geant.net', # 'mx1.mad.es.geant.net', # 'mx1.sof.bg.geant.net', # 'mx1.buc.ro.geant.net', # 'mx1.ham.de.geant.net', # 'mx1.dub.ie.geant.net', # 'mx1.dub2.ie.geant.net', # 'mx1.mar.fr.geant.net', # 'mx1.lon2.uk.geant.net', # 'mx1.ath2.gr.geant.net', # ] # CACHE_SCHEMA = { # "$schema": "http://json-schema.org/draft-07/schema#", # "type": "object", # "patternProperties": { # "^.*\\.geant\\.net$": { # "type": "object", # "properties": { # "bgp": {"type": "object"}, # "vrr": {"type": "object"}, # "interfaces": {"type": "object"}, # "snmp-interfaces": { # "type": "array", # "items": { # "type": "object", # "properties": { # "v4Address": {"type": "string"}, # "v4Mask": {"type": "string"}, # "v4InterfaceName": {"type": "string"}, # "v6Address": {"type": "string"}, # "v6Mask": {"type": "string"}, # "v6InterfaceName": {"type": "string"}, # }, # "additionalProperties": False # } # } # }, # "required": ["bgp", "vrr", "interfaces", "snmp-interfaces"], # "additionalProperties": False # } # }, # "additionalProperties": False # } # # # def _parsed_old_style_output_data(s): # for l in s.splitlines(): # if not l: # continue # # m = re.match( # (r'^set\s+routing-instances\s+(\S+)+\s+' # r'protocols\s+(\S+)\s+' # r'group\s+(\S+)\s+' # r'neighbor\s+([a-f\d\.:]+)\s+' # r'description\s+"?([^"]+)"?\s*$'), l) # assert m # # yield { # "routing-instances": m.group(1), # "protocols": m.group(2), # "group": m.group(3), # "neighbor": m.group(4), # "description": m.group(5) # } # # # @pytest.fixture(params=ROUTER_HOSTNAMES) # def router_output(request): # output = {} # for key in ["bgpv4", "bgpv6", "bgp", "interfaces", "vrr"]: # filename = os.path.join( # TEST_DATA_DIR, # "%s-%s.output" % (request.param, key)) # with open(filename) as f: # output[key] = f.read() # return output # # # def test_ipv4_neighbors(mocker, router_output): # # old_v4_data = dict([ # (x["neighbor"], x) # for x in # _parsed_old_style_output_data(router_output["bgpv4"])]) # # def _mocked_ssh_exec_commands(hostname, params, commands): # assert len(commands) == 2 # the expected number # return [None, router_output["bgp"]] # # mocker.patch( # 'inventory_provider.juniper.ssh_exec_commands', # _mocked_ssh_exec_commands) # # neighbors = juniper.fetch_bgp_config( # None, # None, # group_expression=r'^GEANT-IX[\s-].*$') # # assert len(neighbors) == len(old_v4_data) # for n in neighbors: # address = n["name"]["data"] # description = n["description"][0]["data"] # assert old_v4_data[address]["description"] == description # # # def test_ipv6_neighbors(mocker, router_output): # # old_v6_data = dict([ # (x["neighbor"], x) # for x in # _parsed_old_style_output_data(router_output["bgpv6"])]) # # def _mocked_ssh_exec_commands(hostname, params, commands): # assert len(commands) == 2 # the expected number # return [None, router_output["bgp"]] # # mocker.patch( # 'inventory_provider.juniper.ssh_exec_commands', # _mocked_ssh_exec_commands) # # neighbors = juniper.fetch_bgp_config( # None, # None, # group_expression=r'^GEANT-IXv6[\s-].*$') # # assert len(neighbors) == len(old_v6_data) # for n in neighbors: # address = n["name"]["data"] # description = n["description"][0]["data"] # assert old_v6_data[address]["description"] == description # # # COMMAND_HANDLERS = { # 'bgp': juniper.fetch_bgp_config, # 'vrr': juniper.fetch_vrr_config, # 'interfaces': juniper.fetch_interfaces # } # # # def test_juniper_shell_output_parsing(mocker, router_output): # """ # just call the correct parser for each type of shell output # (not a proper test ... just verifies there's no crash) # # TODO: add jsonschema validation # :param router_output: # :return: # """ # # output = None # # def _mocked_ssh_exec_commands(hostname, params, commands): # assert len(commands) == 2 # the expected number # return [None, output] # # mocker.patch( # 'inventory_provider.juniper.ssh_exec_commands', # _mocked_ssh_exec_commands) # # for key in ["bgp", "vrr", "interfaces"]: # output = router_output[key] # COMMAND_HANDLERS[key](None, None)