Newer
Older
import responses
from gso.services import ipam
@responses.activate
def test_new_service_networks(data_config_filename: PathLike):
responses.add(
method=responses.POST,
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501
"network": "10.255.255.20/32",
},
)
responses.add(
method=responses.POST,
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
"network": "dead:beef::18/128",
},
service_networks = ipam.new_service_networks(service_type="TRUNK")
assert service_networks == ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128")
# should fail because this service type has networks instead of containers
with pytest.raises(AssertionError):
service_networks = ipam.new_service_networks(service_type="LO")
assert service_networks is None
def test_new_service_host(data_config_filename: PathLike):
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/record:host$"),
json="record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20", # noqa: E501
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/record:a$"),
json="record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default", # noqa: E501
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/record:aaaa$"),
json="record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default", # noqa: E501
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.*"),
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501
"network": "10.255.255.20/32",
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.254.*"),
json=[
{
"_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501
"network": "10.255.254.20/32",
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"),
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
"network": "dead:beef::18/128",
responses.add(method=responses.GET, url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), json=[])
JORGE SASIAIN
committed
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$"), # noqa: E501
json={"ips": ["10.255.255.20"]},
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$"), # noqa: E501
body="Cannot find 1 available IP address(es) in this network",
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$"), # noqa: E501
json={"ips": ["dead:beef::18"]},
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/network.*_return_fields.*"),
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501
"network": "10.255.255.20/32",
},
)
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/ipv6network.*_return_fields.*"),
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
"network": "dead:beef::18/128",
},
# test host creation by IP addresses
service_hosts = ipam.new_service_host(
host_addresses=ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
),
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
# test host creation by network addresses
service_hosts = ipam.new_service_host(
service_networks=ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128")
),
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
# test host creation by just service_type when service cfg uses networks
service_hosts = ipam.new_service_host(hostname="test", service_type="LO")
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
# test host creation by just service_type when service cfg uses containers
service_hosts = ipam.new_service_host(hostname="test", service_type="TRUNK")
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
# test host creation that should return a no available IP error
with pytest.raises(AssertionError):
service_hosts = ipam.new_service_host(
service_networks=ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.254.20/32"), v6=ipaddress.ip_network("dead:beef::18/128")
),
)
assert service_hosts is None
JORGE SASIAIN
committed
# test host creation that should return a network not exist error
with pytest.raises(AssertionError):
service_hosts = ipam.new_service_host(
JORGE SASIAIN
committed
service_networks=ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("beef:dead::18/128")
),
JORGE SASIAIN
committed
)
assert service_hosts is None
def test_delete_service_network(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"),
"_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.20.*"),
"_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"),
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
)
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"),
"_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"),
body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*/network.*100.255.255.*"),
body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"),
body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"),
body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501
service_network = ipam.delete_service_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO")
assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26"))
JORGE SASIAIN
committed
service_network = ipam.delete_service_network(
network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO"
JORGE SASIAIN
committed
)
JORGE SASIAIN
committed
service_network = ipam.delete_service_network(
network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK"
assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128"))
JORGE SASIAIN
committed
service_network = ipam.delete_service_network(
network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK"
JORGE SASIAIN
committed
)
def test_delete_service_host(data_config_filename: PathLike):
responses.add(
method=responses.GET,
"_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501
"ipv4addrs": [
"_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501
"configure_for_dhcp": False,
"host": "ha_lo.gso",
"ipv4addr": "10.255.255.1",
"_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501
"configure_for_dhcp": False,
"host": "ha_lo.gso",
"ipv6addr": "dead:beef::1",
)
responses.add(
method=responses.GET,
"_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501
"canonical": "hA_LO.lo",
"name": "alias1.ha.lo",
"view": "default",
"_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501
"canonical": "hA_LO.lo",
"name": "alias2.ha.lo",
"view": "default",
},
],
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*record:host.*"),
body="record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default", # noqa: E501
)
responses.add(
method=responses.DELETE,
url=re.compile(r".*/wapi.*record:cname.*"),
body="record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default", # noqa: E501
)
input_host_addresses = ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1")
)
host_addresses = ipam.delete_service_host(
host_addresses=input_host_addresses,
cname_aliases=["alias1.ha", "alias2.ha"],
service_type="LO",
)
assert host_addresses == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1")
)
# Fail because missing CNAME
with pytest.raises(AssertionError):
host_addresses = ipam.delete_service_host(
hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO"
)
assert host_addresses is None
# Fail because non-matching CNAME
with pytest.raises(AssertionError):
host_addresses = ipam.delete_service_host(
host_addresses=input_host_addresses,
cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"],
service_type="LO",
)
assert host_addresses is None