Skip to content
Snippets Groups Projects
Verified Commit 5ac86c8b authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

update unit tests to work with Infoblox service

parent 4bf5d743
No related branches found
No related tags found
1 merge request!65Feature/add infoblox service
Pipeline #84031 passed
......@@ -57,7 +57,7 @@ def configuration_data() -> dict:
"IPAM": {
"INFOBLOX": {
"scheme": "https",
"wapi_version": "v2.12",
"wapi_version": "2.12",
"host": "10.0.0.1",
"username": "robot-user",
"password": "robot-user-password",
......
import ipaddress
import re
from os import PathLike
import pytest
import responses
from requests import codes
from gso.services import infoblox
from gso.services.infoblox import AllocationError, DeletionError
def _set_up_network_responses():
responses.add(method=responses.GET, url=re.compile(r".+/wapi/v2\.12/network\?network=10\.255\.255\.0.+"), json=[])
responses.add(method=responses.GET, url=re.compile(r".+/wapi/v2\.12/ipv6network\?network=dead%3Abeef.+"), json=[])
responses.add(
method=responses.POST,
url=re.compile(r".+/wapi/v2\.12/network.+"),
json={
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default",
"network": "10.255.255.20/32",
},
status=codes.CREATED,
)
responses.add(
method=responses.POST,
url=re.compile(r".+/wapi/v2\.12/ipv6network.+"),
json={
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default",
"network": "dead:beef::18/128",
},
status=codes.CREATED,
)
def _set_up_host_responses():
responses.add(
method=responses.GET,
url="https://10.0.0.1/wapi/v2.12/record%3Ahost?_return_fields=extattrs%2Cipv6addrs%2Cname%2Cview%2Caliases",
json=[],
)
responses.add(
method=responses.GET,
url="https://10.0.0.1/wapi/v2.12/record%3Ahost?name=test.lo.geant.net&ipv6addr=func%3Anextavailableip%3Adead%3A"
"beef%3A%3A%2F80%2Cdefault",
json=[],
)
responses.add(
method=responses.GET,
url="https://10.0.0.1/wapi/v2.12/record%3Ahost?name=test.lo.geant.net&_return_fields=extattrs%2Cipv4addrs%2Cnam"
"e%2Cview%2Caliases",
json=[],
)
responses.add(
method=responses.GET,
url="https://10.0.0.1/wapi/v2.12/record%3Ahost?name=broken&_return_fields=extattrs%2Cipv4addrs%2Cname%2Cview%2C"
"aliases",
json=[],
)
responses.add(
method=responses.GET,
url=re.compile(
r"https://10.0.0.1/wapi/v2.12/record%3Ahost\?name=broken&ipv6addr=func%3Anextavailableip%3Adead%3Abeef%3A%3"
r"A%2F80%2Cdefault.*"
),
json=[],
status=codes.BAD,
)
responses.add(
method=responses.POST,
url=re.compile(r".+/wapi/v2\.12/record%3Ahost\?_return_fields=extattrs%2Cipv6addrs.+"),
json={
"_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lm5ldC5nZWFudC5sby50ZXN0:test.lo.geant.net/default",
"ipv6addrs": [
{
"_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQubmV0LmdlYW50LmxvLnRlc3QuZGVhZDpiZ"
"WVmOjoxLg:dead%3Abeef%3A%3A1/test.lo.geant.net/default",
"configure_for_dhcp": False,
"duid": "00:00:00:00:00:00:00:00:00:00",
"host": "test.lo.geant.net",
"ipv6addr": "dead:beef::1",
}
],
"ip": "dead:beef::1",
"name": "test.lo.geant.net",
"view": "default",
},
status=codes.CREATED,
)
responses.add(
method=responses.PUT,
url="https://10.0.0.1/wapi/v2.12/record%3Ahost/ZG5zLmhvc3QkLl9kZWZhdWx0Lm5ldC5nZWFudC5sd28udGVzdA%3Atest.lo.gea"
"nt.net/default?_return_fields=extattrs%2Cipv4addrs%2Cname%2Cview%2Caliases",
json={
"ipv4addrs": [
{
"_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQubmV0LmdlYW50Lmx3by50ZXN0LjEwLjI1N"
"S4yNTUuMTI5Lg:10.255.255.129/test.lo.geant.net/default",
"configure_for_dhcp": False,
"host": "test.lo.geant.net",
"ipv4addr": "10.255.255.129",
"mac": "00:00:00:00:00:00",
}
],
"name": "test.lo.geant.net",
"view": "default",
},
)
responses.add(
method=responses.GET,
url="https://10.0.0.1/wapi/v2.12/record%3Ahost?name=test.lo.geant.net&_return_fields=extattrs%2Cipv4addrs%2Cnam"
"e%2Cview%2Caliases",
json=[
{
"_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lm5ldC5nZWFudC5sd28udGVzdA:test.lo.geant.net/default",
"ipv4addrs": [
{
"_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQubmV0LmdlYW50Lmx3by50ZXN0LjEwL"
"jI1N"
"S4yNTUuMTI5Lg:10.255.255.129/test.lo.geant.net/default",
"configure_for_dhcp": False,
"host": "test.lo.geant.net",
"ipv4addr": "10.255.255.129",
"mac": "00:00:00:00:00:00",
}
],
"name": "test.lo.geant.net",
"view": "default",
}
],
)
@responses.activate
def test_allocate_networks(data_config_filename: PathLike):
_set_up_network_responses()
new_v4_network = infoblox.allocate_v4_network("TRUNK")
new_v6_network = infoblox.allocate_v6_network("TRUNK")
assert new_v4_network == ipaddress.IPv4Network("10.255.255.20/32")
assert new_v6_network == ipaddress.IPv6Network("dead:beef::18/128")
@responses.activate
def test_allocate_bad_network(data_config_filename: PathLike):
_set_up_network_responses()
with pytest.raises(AllocationError) as e:
infoblox.allocate_v4_network("LO")
assert e.value.args[0] == "Cannot allocate anything in [], check whether any IP space is available."
with pytest.raises(AllocationError) as e:
infoblox.allocate_v6_network("LO")
assert e.value.args[0] == "Cannot allocate anything in [], check whether any IP space is available."
@responses.activate
def test_allocate_good_host(data_config_filename: PathLike):
_set_up_host_responses()
new_host = infoblox.allocate_host("test.lo.geant.net", "LO", [], "test host")
assert new_host == (ipaddress.ip_address("10.255.255.129"), ipaddress.ip_address("dead:beef::1"))
@responses.activate
def test_allocate_bad_host(data_config_filename: PathLike):
_set_up_host_responses()
with pytest.raises(AllocationError) as e:
infoblox.allocate_host("broken", "TRUNK", [], "Unavailable host")
assert e.value.args[0] == "Cannot find 1 available IP address in networks []."
@responses.activate
def test_delete_good_network(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url="https://10.0.0.1/wapi/v2.12/network?network=10.255.255.0%2F26&_return_fields=comment%2Cextattrs%2Cnetwork%"
"2Cnetwork_view",
json=[
{
"_ref": "network/ZG5zLm5ldHdvcmskNjIuNDAuOTYuMC8yNC8w:10.255.255.0/26/default",
"network": "10.255.255.0/26",
"network_view": "default",
}
],
)
responses.add(
method=responses.DELETE,
url="https://10.0.0.1/wapi/v2.12/network/ZG5zLm5ldHdvcmskNjIuNDAuOTYuMC8yNC8w%3A10.255.255.0/26/default",
json=[],
)
infoblox.delete_network(ipaddress.IPv4Network("10.255.255.0/26"))
@responses.activate
def test_delete_non_existent_network(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url="https://10.0.0.1/wapi/v2.12/network?network=10.255.255.0%2F26&_return_fields=comment%2Cextattrs%2Cnetwork%"
"2Cnetwork_view",
json=[],
)
with pytest.raises(DeletionError) as e:
infoblox.delete_network(ipaddress.IPv4Network("10.255.255.0/26"))
assert e.value.args[0] == "Could not find network 10.255.255.0/26, nothing has been deleted."
@responses.activate
def test_delete_good_host(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(
r"https://10\.0\.0\.1/wapi/v2\.12/record%3Ahost\?(?:name=ha_lo\.gso|ipv4addr=10\.255\.255\.1)?.+"
),
json=[
{
"_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default",
"ipv4addrs": [
{
"_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuM"
"S40.255.255.1/ha_lo.gso/default",
"configure_for_dhcp": False,
"host": "ha_lo.gso",
"ipv4addr": "10.255.255.1",
}
],
"ipv6addrs": [
{
"_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4"
":dead%3Abeef%3A%3A1/ha_lo.gso/default",
"configure_for_dhcp": False,
"host": "ha_lo.gso",
"ipv6addr": "dead:beef::1",
}
],
"name": "ha_lo.gso",
"view": "default",
}
],
)
responses.add(
method=responses.DELETE,
url=re.compile(
r"https://10\.0\.0\.1/wapi/v2\.12/record%3Ahost/.+(ha_lo\.gso|dead:beef::1|10\.255\.255\.1)/default"
),
json=[],
)
infoblox.delete_host_by_fqdn("ha_lo.gso")
infoblox.delete_host_by_ip(ipaddress.IPv4Address("10.255.255.1"))
infoblox.delete_host_by_ip(ipaddress.IPv6Address("dead:beef::1"))
@responses.activate
def test_delete_bad_host(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(r".+"),
json=[],
)
with pytest.raises(DeletionError) as e:
infoblox.delete_host_by_ip(ipaddress.IPv4Address("10.255.255.1"))
assert e.value.args[0] == "Could not find host at 10.255.255.1, nothing has been deleted."
with pytest.raises(DeletionError) as e:
infoblox.delete_host_by_ip(ipaddress.IPv6Address("dead:beef::1"))
assert e.value.args[0] == "Could not find host at dead:beef::1, nothing has been deleted."
with pytest.raises(DeletionError) as e:
infoblox.delete_host_by_fqdn("fake.host.net")
assert e.value.args[0] == "Could not find host at fake.host.net, nothing has been deleted."
import ipaddress
import re
from os import PathLike
import pytest
import responses
from gso.services import ipam
@responses.activate
def test_allocate_networks(data_config_filename: PathLike):
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/network.*"),
json={
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501
"network": "10.255.255.20/32",
},
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/ipv6network.*"),
json={
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
"network": "dead:beef::18/128",
},
)
service_networks = ipam.allocate_networks(service_type="TRUNK")
assert service_networks == ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128")
)
# should fail because this service type has networks instead of containers
with pytest.raises(AssertionError):
service_networks = ipam.allocate_networks(service_type="LO")
assert service_networks is None
@responses.activate
def test_allocate_host(data_config_filename: PathLike):
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/record:host$"),
json="record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20", # noqa: E501
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.*"),
json=[
{
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501
"network": "10.255.255.20/32",
"network_view": "default",
}
],
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.254.*"),
json=[
{
"_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501
"network": "10.255.254.20/32",
"network_view": "default",
}
],
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"),
json=[
{
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
"network": "dead:beef::18/128",
"network_view": "default",
}
],
)
responses.add(method=responses.GET, url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), json=[])
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$"), # noqa: E501
json={"ips": ["10.255.255.20"]},
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$"), # noqa: E501
body="Cannot find 1 available IP address(es) in this network",
status=400,
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$"), # noqa: E501
json={"ips": ["dead:beef::18"]},
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/network.*_return_fields.*"),
json={
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501
"network": "10.255.255.20/32",
},
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/ipv6network.*_return_fields.*"),
json={
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
"network": "dead:beef::18/128",
},
)
# test host creation by IP addresses
service_hosts = ipam.allocate_host(
hostname="test",
service_type="TRUNK",
host_addresses=ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
),
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
)
# test host creation by network addresses
service_hosts = ipam.allocate_host(
hostname="test",
service_type="TRUNK",
service_networks=ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128")
),
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
)
# test host creation by just service_type when service cfg uses networks
service_hosts = ipam.allocate_host(hostname="test", service_type="LO")
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
)
# test host creation by just service_type when service cfg uses containers
service_hosts = ipam.allocate_host(hostname="test", service_type="TRUNK")
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
)
# test host creation that should return a no available IP error
with pytest.raises(AssertionError):
service_hosts = ipam.allocate_host(
hostname="test",
service_type="TRUNK",
service_networks=ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.254.20/32"), v6=ipaddress.ip_network("dead:beef::18/128")
),
)
assert service_hosts is None
# test host creation that should return a network not exist error
with pytest.raises(AssertionError):
service_hosts = ipam.allocate_host(
hostname="test",
service_type="TRUNK",
service_networks=ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("beef:dead::18/128")
),
)
assert service_hosts is None
@responses.activate
def test_delete_network(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"),
json=[
{
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501
"network": "10.255.255.0/26",
"network_view": "default",
}
],
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.20.*"),
json=[
{
"_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501
"network": "100.255.255.20/32",
"network_view": "default",
}
],
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"),
json=[
{
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
"network": "dead:beef::18/128",
"network_view": "default",
}
],
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"),
json=[
{
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501
"network": "beef:dead::18/128",
"network_view": "default",
}
],
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"),
body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*/network.*100.255.255.*"),
body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"),
body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"),
body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501
)
service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO")
assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26"))
with pytest.raises(AssertionError):
service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO")
assert service_network is None
service_network = ipam.delete_network(network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK")
assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128"))
with pytest.raises(AssertionError):
service_network = ipam.delete_network(network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK")
assert service_network is None
@responses.activate
def test_delete_host(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*record:host.*"),
json=[
{
"_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501
"ipv4addrs": [
{
"_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501
"configure_for_dhcp": False,
"host": "ha_lo.gso",
"ipv4addr": "10.255.255.1",
}
],
"ipv6addrs": [
{
"_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501
"configure_for_dhcp": False,
"host": "ha_lo.gso",
"ipv6addr": "dead:beef::1",
}
],
"name": "ha_lo.gso",
"view": "default",
}
],
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*record:cname.*"),
json=[
{
"_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501
"canonical": "hA_LO.lo",
"name": "alias1.ha.lo",
"view": "default",
},
{
"_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501
"canonical": "hA_LO.lo",
"name": "alias2.ha.lo",
"view": "default",
},
],
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*record:host.*"),
body="record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*record:cname.*"),
body="record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default", # noqa: E501
)
input_host_addresses = ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1")
)
host_addresses = ipam.delete_host(
hostname="ha_lo",
host_addresses=input_host_addresses,
cname_aliases=["alias1.ha", "alias2.ha"],
service_type="LO",
)
assert host_addresses == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1")
)
# Fail because missing CNAME
with pytest.raises(AssertionError):
host_addresses = ipam.delete_host(
hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO"
)
assert host_addresses is None
# Fail because non-matching CNAME
with pytest.raises(AssertionError):
host_addresses = ipam.delete_host(
hostname="ha_lo",
host_addresses=input_host_addresses,
cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"],
service_type="LO",
)
assert host_addresses is None
@responses.activate
def test_validate_network(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"),
json=[
{
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501
"network": "10.255.255.0/26",
"network_view": "default",
"comment": "the subscription id is 0123456789abcdef",
}
],
)
service_network = ipam.validate_network(
gso_subscription_id="0123456789abcdef", network=ipam.ipaddress.ip_network("10.255.255.0/26")
)
assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26"))
# Fail because non-matching subscription id
with pytest.raises(AssertionError):
service_network = ipam.validate_network(
gso_subscription_id="1a2b3c4d5e6f7890", network=ipam.ipaddress.ip_network("10.255.255.0/26")
)
assert service_network is None
@responses.activate
def test_validate_host(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*record:host.*"),
json=[
{
"_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501
"ipv4addrs": [
{
"_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501
"configure_for_dhcp": False,
"host": "ha_lo.gso",
"ipv4addr": "10.255.255.1",
}
],
"ipv6addrs": [
{
"_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501
"configure_for_dhcp": False,
"host": "ha_lo.gso",
"ipv6addr": "dead:beef::1",
}
],
"name": "ha_lo.gso",
"view": "default",
}
],
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*record:cname.*"),
json=[
{
"_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501
"canonical": "hA_LO.lo",
"name": "alias1.ha.lo",
"view": "default",
},
{
"_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501
"canonical": "hA_LO.lo",
"name": "alias2.ha.lo",
"view": "default",
},
],
)
input_host_addresses = ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1")
)
host_addresses = ipam.validate_host(
hostname="ha_lo",
host_addresses=input_host_addresses,
cname_aliases=["alias1.ha", "alias2.ha"],
service_type="LO",
)
assert host_addresses == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1")
)
# Fail because non-matching hostname
host_addresses = ipam.validate_host(
hostname="wrong_hostname",
host_addresses=input_host_addresses,
cname_aliases=["alias1.ha", "alias2.ha"],
service_type="LO",
)
with pytest.raises(AssertionError):
host_addresses = ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1")
)
assert host_addresses is None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment