Skip to content
Snippets Groups Projects
Commit 3968329f authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Merge branch 'feature/update-tests' into 'develop'

make more sensible unit tests, and unit test IP trunks too

See merge request !32
parents af1b11f4 c6086900
No related branches found
No related tags found
1 merge request!32make more sensible unit tests, and unit test IP trunks too
from typing import TextIO
TEST_CALLBACK_URL = 'https://fqdn.abc.xyz/api/resume'
def test_ansible_runner_run(**kwargs):
class Runner:
def __init__(self):
self.status = 'success'
self.rc = 0
self.stdout = TextIO()
return Runner()
......@@ -5,17 +5,15 @@ import jsonschema
import responses
from lso.playbook import PlaybookLaunchResponse
from test.routes import test_ansible_runner_run, TEST_CALLBACK_URL
@responses.activate
def test_router_provisioning(client):
callback_url = 'http://fqdn.xyz.abc:12345/'
responses.add(
method=responses.POST,
url=callback_url)
responses.put(url=TEST_CALLBACK_URL, status=204)
params = {
'callback': callback_url,
'callback': TEST_CALLBACK_URL,
'dry_run': True,
'verb': 'deploy',
'subscription': {
......@@ -38,15 +36,15 @@ def test_router_provisioning(client):
}
}
with patch('lso.playbook.ansible_runner.run') as _run:
with patch('lso.playbook.ansible_runner.run',
new=test_ansible_runner_run) as _run:
rv = client.post('/api/device/', json=params)
assert rv.status_code == 200
response = rv.json()
# wait a second for the run thread to finish
time.sleep(1)
_run.assert_called()
# wait two seconds for the run thread to finish
time.sleep(2)
jsonschema.validate(response, PlaybookLaunchResponse.schema())
# responses.assert_call_count(callback_url, 1)
responses.assert_call_count(TEST_CALLBACK_URL, 1)
assert response['status'] == 'ok'
......@@ -5,171 +5,162 @@ import jsonschema
import responses
from lso.playbook import PlaybookLaunchResponse
from test.routes import test_ansible_runner_run, TEST_CALLBACK_URL
_SUBSCRIPTION_OBJECT = {
'subscription_id': '0',
'description': 'IP trunk, geant_s_sid:GS-00000',
'iptrunk': {
'geant_s_sid': 'GS-00000',
'iptrunk_description': 'A description for this trunk',
'iptrunk_isis_metric': 9000,
'iptrunk_minimum_links': 1,
'iptrunk_sideA_ae_geant_a_sid': 'GA-00000',
'iptrunk_sideA_ae_iface': 'ae0',
'iptrunk_sideA_ae_members': [
'ge-0/0/0'
],
'iptrunk_sideA_ae_members_description': [
'this is the first interface on side A'
],
'iptrunk_sideA_node': {
'device_fqdn': 'rtx.city.country.geant.net',
'device_ias_lt_ipv4_network': '1.0.0.0/31',
'device_ias_lt_ipv6_network': 'dead:beef::3/126',
'device_lo_ipv4_address': '1.0.0.0',
'device_lo_ipv6_address': 'dead:beef::',
'device_lo_iso_address': '00.0000.0000.0000.0000.0000.00',
'device_role': 'p',
'device_si_ipv4_network': '0.0.1.0/31',
'device_site': {
'name': 'SiteBlock',
'label': None,
'site_city': 'City',
'site_name': 'city',
'site_tier': '1',
'site_country': 'Country',
'site_latitude': 0.0,
'site_longitude': 0.0,
'site_internal_id': 0,
'site_country_code': 'XX',
'owner_subscription_id': '0',
'site_bgp_community_id': 0,
'subscription_instance_id': '0'
},
'device_ts_address': '127.0.0.1',
'device_ts_port': 22,
'device_vendor': 'vendor',
'owner_subscription_id': '0',
'subscription_instance_id': '0'
},
'iptrunk_sideB_ae_geant_a_sid': 'GA-00002',
'iptrunk_sideB_ae_iface': 'ae0',
'iptrunk_sideB_ae_members': [
'ge-0/0/0'
],
'iptrunk_sideB_ae_members_description': [
'this is the first interface side B'
],
'iptrunk_sideB_node': {
'device_fqdn': 'rtx.town.country.geant.net',
'device_ias_lt_ipv4_network': '0.0.0.0/31',
'device_ias_lt_ipv6_network': 'deaf:beef::1/126',
'device_lo_ipv4_address': '0.0.0.0',
'device_lo_ipv6_address': 'dead:beef::2',
'device_lo_iso_address': '00.0000.0000.0000.0000.0000.00',
'device_role': 'p',
'device_si_ipv4_network': '0.1.0.0/31',
'device_site': {
'name': 'SiteBlock',
'label': None,
'site_city': 'Town',
'site_name': 'town',
'site_tier': '1',
'site_country': 'Country',
'site_latitude': 0.0,
'site_longitude': 0.0,
'site_internal_id': 1,
'site_country_code': 'xx',
'owner_subscription_id': '0',
'site_bgp_community_id': 2,
'subscription_instance_id': '0'
},
'device_ts_address': '127.0.0.2',
'device_ts_port': 22,
'device_vendor': 'vendor',
'owner_subscription_id': '0',
'subscription_instance_id': '0'
},
'iptrunk_speed': '1',
'iptrunk_type': 'Dark_fiber'
'subscription_id': '0',
'description': 'IP trunk, geant_s_sid:GS-00000',
'iptrunk': {
'geant_s_sid': 'GS-00000',
'iptrunk_description': 'A description for this trunk',
'iptrunk_isis_metric': 9000,
'iptrunk_minimum_links': 1,
'iptrunk_sideA_ae_geant_a_sid': 'GA-00000',
'iptrunk_sideA_ae_iface': 'ae0',
'iptrunk_sideA_ae_members': [
'ge-0/0/0'
],
'iptrunk_sideA_ae_members_description': [
'this is the first interface on side A'
],
'iptrunk_sideA_node': {
'device_fqdn': 'rtx.city.country.geant.net',
'device_ias_lt_ipv4_network': '1.0.0.0/31',
'device_ias_lt_ipv6_network': 'dead:beef::3/126',
'device_lo_ipv4_address': '1.0.0.0',
'device_lo_ipv6_address': 'dead:beef::',
'device_lo_iso_address': '00.0000.0000.0000.0000.0000.00',
'device_role': 'p',
'device_si_ipv4_network': '0.0.1.0/31',
'device_site': {
'name': 'SiteBlock',
'label': None,
'site_city': 'City',
'site_name': 'city',
'site_tier': '1',
'site_country': 'Country',
'site_latitude': 0.0,
'site_longitude': 0.0,
'site_internal_id': 0,
'site_country_code': 'XX',
'owner_subscription_id': '0',
'site_bgp_community_id': 0,
'subscription_instance_id': '0'
},
'status': 'provisioning'
}
'device_ts_address': '127.0.0.1',
'device_ts_port': 22,
'device_vendor': 'vendor',
'owner_subscription_id': '0',
'subscription_instance_id': '0'
},
'iptrunk_sideB_ae_geant_a_sid': 'GA-00002',
'iptrunk_sideB_ae_iface': 'ae0',
'iptrunk_sideB_ae_members': [
'ge-0/0/0'
],
'iptrunk_sideB_ae_members_description': [
'this is the first interface side B'
],
'iptrunk_sideB_node': {
'device_fqdn': 'rtx.town.country.geant.net',
'device_ias_lt_ipv4_network': '0.0.0.0/31',
'device_ias_lt_ipv6_network': 'deaf:beef::1/126',
'device_lo_ipv4_address': '0.0.0.0',
'device_lo_ipv6_address': 'dead:beef::2',
'device_lo_iso_address': '00.0000.0000.0000.0000.0000.00',
'device_role': 'p',
'device_si_ipv4_network': '0.1.0.0/31',
'device_site': {
'name': 'SiteBlock',
'label': None,
'site_city': 'Town',
'site_name': 'town',
'site_tier': '1',
'site_country': 'Country',
'site_latitude': 0.0,
'site_longitude': 0.0,
'site_internal_id': 1,
'site_country_code': 'xx',
'owner_subscription_id': '0',
'site_bgp_community_id': 2,
'subscription_instance_id': '0'
},
'device_ts_address': '127.0.0.2',
'device_ts_port': 22,
'device_vendor': 'vendor',
'owner_subscription_id': '0',
'subscription_instance_id': '0'
},
'iptrunk_speed': '1',
'iptrunk_type': 'Dark_fiber'
},
'status': 'provisioning'
}
@responses.activate
def test_ip_trunk_provisioning(client):
callback_url = 'http://fqdn.xyz.abc:12345/'
responses.add(
method=responses.POST,
url=callback_url)
responses.put(url=TEST_CALLBACK_URL, status=204)
params = {
'callback': callback_url,
'callback': TEST_CALLBACK_URL,
'dry_run': True,
'object': 'trunk_interface',
'verb': 'deploy',
'subscription': _SUBSCRIPTION_OBJECT
}
with patch('lso.playbook.ansible_runner.run') as _run:
with patch('lso.playbook.ansible_runner.run',
new=test_ansible_runner_run) as _run:
rv = client.post('/api/ip_trunk/', json=params)
assert rv.status_code == 200
response = rv.json()
# wait a second for the run thread to finish
time.sleep(1)
_run.assert_called()
jsonschema.validate(response, PlaybookLaunchResponse.schema())
# responses.assert_call_count(callback_url, 1)
responses.assert_call_count(TEST_CALLBACK_URL, 1)
assert response['status'] == 'ok'
@responses.activate
def test_ip_trunk_modification(client):
callback_url = 'http://fqdn.xyz.abc:12345/'
responses.add(
method=responses.PUT,
url=callback_url)
responses.put(url=TEST_CALLBACK_URL, status=204)
params = {
'callback': callback_url,
'callback': TEST_CALLBACK_URL,
'dry_run': True,
'verb': 'modify',
'subscription': _SUBSCRIPTION_OBJECT,
'old_subscription': _SUBSCRIPTION_OBJECT
}
with patch('lso.playbook.ansible_runner.run') as _run:
with patch('lso.playbook.ansible_runner.run',
new=test_ansible_runner_run) as _run:
rv = client.put('/api/ip_trunk/', json=params)
assert rv.status_code == 200
response = rv.json()
# wait a second for the run thread to finish
time.sleep(1)
_run.assert_called()
jsonschema.validate(response, PlaybookLaunchResponse.schema())
# responses.assert_call_count(callback_url, 1)
responses.assert_call_count(TEST_CALLBACK_URL, 1)
assert response['status'] == 'ok'
@responses.activate
def test_ip_trunk_modification(client):
callback_url = 'http://fqdn.xyz.abc:12345/'
responses.add(
method=responses.DELETE,
url=callback_url)
responses.put(url=TEST_CALLBACK_URL, status=204)
params = {
'callback': callback_url,
'callback': TEST_CALLBACK_URL,
'dry_run': True,
'verb': 'terminate',
'subscription': _SUBSCRIPTION_OBJECT
}
with patch('lso.playbook.ansible_runner.run') as _run:
with patch('lso.playbook.ansible_runner.run',
new=test_ansible_runner_run) as _run:
rv = client.request(url='/api/ip_trunk/',
method=responses.DELETE,
json=params)
......@@ -177,9 +168,8 @@ def test_ip_trunk_modification(client):
response = rv.json()
# wait a second for the run thread to finish
time.sleep(1)
_run.assert_called()
jsonschema.validate(response, PlaybookLaunchResponse.schema())
# responses.assert_call_count(callback_url, 1)
responses.assert_call_count(TEST_CALLBACK_URL, 1)
assert response['status'] == 'ok'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment