Newer
Older
import pytest
import re
import responses
from gso.services import ipam
@responses.activate
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/network.*'),
json={
'_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501
'network': '10.255.255.20/32'
}
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/ipv6network.*'),
json={
'_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501
'network': 'dead:beef::18/128'
}
)
service_networks = ipam.new_service_networks(service_type='TRUNK')
assert service_networks == ipam.ServiceNetworks(
v4=ipaddress.ip_network('10.255.255.20/32'),
v6=ipaddress.ip_network('dead:beef::18/128')
)
# should fail because this service type has networks instead of containers
with pytest.raises(AssertionError):
service_networks = ipam.new_service_networks(service_type='LO')
assert service_networks is None
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/record:host$'),
json='record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20' # noqa: E501
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/record:a$'),
json='record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/record:aaaa$'),
json='record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501
)
responses.add(
method=responses.GET,
url=re.compile(r'.*/wapi.*/network.*10.255.255.*'),
json=[
{
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501
"network": "10.255.255.20/32",
"network_view": "default"
}
]
)
responses.add(
method=responses.GET,
url=re.compile(r'.*/wapi.*/network.*10.255.254.*'),
json=[
{
"_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501
"network": "10.255.254.20/32",
"network_view": "default"
}
]
)
responses.add(
method=responses.GET,
url=re.compile(r'.*/wapi.*/ipv6network.*'),
json=[
{
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
"network": "dead:beef::18/128",
"network_view": "default"
}
]
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$'), # noqa: E501
json={'ips': ['10.255.255.20']}
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$'), # noqa: E501
body="Cannot find 1 available IP address(es) in this network",
status=400
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$'), # noqa: E501
json={'ips': ['dead:beef::18']}
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/network.*_return_fields.*'),
json={
'_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501
'network': '10.255.255.20/32'
}
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/ipv6network.*_return_fields.*'),
json={
'_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501
'network': 'dead:beef::18/128'
}
)
# test host creation by IP addresses
service_hosts = ipam.new_service_host(
hostname='test',
host_addresses=ipam.HostAddresses(
v4=ipaddress.ip_address('10.255.255.20'),
v6=ipaddress.ip_address('dead:beef::18')
)
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address('10.255.255.20'),
v6=ipaddress.ip_address('dead:beef::18')
)
# test host creation by network addresses
service_hosts = ipam.new_service_host(
hostname='test',
service_networks=ipam.ServiceNetworks(
v4=ipaddress.ip_network('10.255.255.20/32'),
v6=ipaddress.ip_network('dead:beef::18/128')
)
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address('10.255.255.20'),
v6=ipaddress.ip_address('dead:beef::18')
)
# test host creation by just service_type when service cfg uses networks
service_hosts = ipam.new_service_host(
hostname='test',
service_type='LO'
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address('10.255.255.20'),
v6=ipaddress.ip_address('dead:beef::18')
)
# test host creation by just service_type when service cfg uses containers
service_hosts = ipam.new_service_host(
hostname='test',
service_type='TRUNK'
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address('10.255.255.20'),
v6=ipaddress.ip_address('dead:beef::18')
)
# test host creation that should return a no available IP error
with pytest.raises(AssertionError):
service_hosts = ipam.new_service_host(
hostname='test',
service_type='TRUNK',
service_networks=ipam.ServiceNetworks(
v4=ipaddress.ip_network('10.255.254.20/32'),
v6=ipaddress.ip_network('dead:beef::18/128')
)
)
assert service_hosts is None