diff --git a/gso/services/ipam.py b/gso/services/ipam.py index eacb46a80e08f6c8356810e3afcac92776afb4f0..de7539e37f9a5d9ac40a7ff98062465d8bc4628b 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -84,10 +84,17 @@ def ip_network_version(network: str = ""): def assert_host_in_service( ipv4_addr: str = "", ipv6_addr: str = "", +<<<<<<< HEAD oss_ipv4_containers=None, oss_ipv6_containers=None, oss_ipv4_networks=None, oss_ipv6_networks=None, +======= + oss_ipv4_containers = None, + oss_ipv6_containers = None, + oss_ipv4_networks = None, + oss_ipv6_networks = None, +>>>>>>> 232046e (Fix conflicts in workflows) ): # IPv4 if oss_ipv4_containers: @@ -110,6 +117,7 @@ def assert_host_in_service( ), "Host's IPv6 address doesn't belong to service type." +<<<<<<< HEAD def assert_network_in_service( ipv4_network: Optional[V4ServiceNetwork] = None, ipv6_network: Optional[V6ServiceNetwork] = None, @@ -150,6 +158,14 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str] ------- (list) all found networks mathing the args, which may be empty. +======= +def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): + """If network_container is not None, find all networks within the specified container. + Otherwise, if network is not None, find the specified network. + Otherwise find all networks. + A list of all found networks is returned (an HTTP 200 code + may be returned with an empty list). +>>>>>>> 232046e (Fix conflicts in workflows) """ assert ip_version in [4, 6] oss = settings.load_oss_params() @@ -165,8 +181,13 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str] f"{wapi(infoblox_params)}/{endpoint}", params=params, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return r.json() @@ -212,8 +233,13 @@ def allocate_network_inner( json=req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), headers={"content-type": "application/json"}, +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): break @@ -232,6 +258,7 @@ def allocate_network_inner( return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) +<<<<<<< HEAD def allocate_ipv4_network( service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None ) -> V4ServiceNetwork: @@ -248,6 +275,10 @@ def allocate_ipv4_network( (V4ServiceNetwork): the allocated network """ +======= +def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork: + """Allocate IPv4 network within the container of the specified service type.""" +>>>>>>> 232046e (Fix conflicts in workflows) if extattrs is None: extattrs = {} oss = settings.load_oss_params() @@ -257,6 +288,7 @@ def allocate_ipv4_network( return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) +<<<<<<< HEAD def allocate_ipv6_network( service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None ) -> V6ServiceNetwork: @@ -273,6 +305,10 @@ def allocate_ipv6_network( (V4ServiceNetwork): the allocated network """ +======= +def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork: + """Allocate IPv6 network within the container of the specified service type.""" +>>>>>>> 232046e (Fix conflicts in workflows) if extattrs is None: extattrs = {} oss = settings.load_oss_params() @@ -283,13 +319,28 @@ def allocate_ipv6_network( def allocate_networks( +<<<<<<< HEAD service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None +======= + service_type: str = "", + comment: Optional[str] = "", + extattrs: Optional[dict] = None +>>>>>>> 232046e (Fix conflicts in workflows) ) -> ServiceNetworks: """Allocate IPv4 and IPv6 network for the specified service type.""" if extattrs is None: extattrs = {} +<<<<<<< HEAD v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs) v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs) +======= + v4_service_network = allocate_ipv4_network( + service_type=service_type, comment=comment, extattrs=extattrs + ) + v6_service_network = allocate_ipv6_network( + service_type=service_type, comment=comment, extattrs=extattrs + ) +>>>>>>> 232046e (Fix conflicts in workflows) return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) @@ -309,8 +360,13 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""): r = requests.post( f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): @@ -323,21 +379,37 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""): return received_ip[0] +<<<<<<< HEAD def allocate_host_inner( # noqa: C901 +======= +def allocate_host_inner( +>>>>>>> 232046e (Fix conflicts in workflows) hostname: str = "", addrs: Optional[Tuple] = None, networks: Optional[Tuple] = None, cname_aliases: Optional[list] = None, dns_view: Optional[str] = "default", +<<<<<<< HEAD extattrs: Optional[dict] = None, +======= + extattrs: Optional[dict] = None +>>>>>>> 232046e (Fix conflicts in workflows) ) -> Union[HostAddresses, str]: # TODO: should hostnames be unique +<<<<<<< HEAD # (fail if hostname already exists in this domain/service)? +======= + # (i.e. fail if hostname already exists in this domain/service)? +>>>>>>> 232046e (Fix conflicts in workflows) if cname_aliases is None: cname_aliases = [] if extattrs is None: extattrs = {} +<<<<<<< HEAD assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host." +======= + assert addrs or networks, "You must specify either the host addresses or the networks CIDR." +>>>>>>> 232046e (Fix conflicts in workflows) oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX @@ -391,8 +463,13 @@ def allocate_host_inner( # noqa: C901 f"{wapi(infoblox_params)}/record:host", json=req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert isinstance(r.json(), str) @@ -407,8 +484,13 @@ def allocate_host_inner( # noqa: C901 f"{wapi(infoblox_params)}/record:cname", json=cname_req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.json().startswith("record:cname/") @@ -416,11 +498,19 @@ def allocate_host_inner( # noqa: C901 return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) +<<<<<<< HEAD def allocate_host( # noqa: C901 hostname: str = "", service_type: str = "", service_networks: Optional[ServiceNetworks] = None, host_addresses: Optional[HostAddresses] = None, +======= +def allocate_host( + hostname: str = "", + service_type: str = "", + service_networks: Optional[ServiceNetworks] = None, + host_addresses: Optional[ServiceNetworks] = None, +>>>>>>> 232046e (Fix conflicts in workflows) cname_aliases: Optional[list] = None, extattrs: Optional[dict] = None, ) -> HostAddresses: @@ -555,7 +645,12 @@ def allocate_host( # noqa: C901 def delete_network( +<<<<<<< HEAD network: ipaddress.ip_network = None, service_type: str = "" +======= + network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, + service_type: str = "" +>>>>>>> 232046e (Fix conflicts in workflows) ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Delete IPv4 or IPv6 network by CIDR.""" oss = settings.load_oss_params() @@ -592,8 +687,13 @@ def delete_network( r = requests.delete( f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -610,8 +710,13 @@ def delete_host( hostname: str = "", host_addresses: HostAddresses = None, cname_aliases: Optional[list] = None, +<<<<<<< HEAD service_type: str = "", ) -> HostAddresses: +======= + service_type: str = "" +) -> Union[V4HostAddress, V6HostAddress]: +>>>>>>> 232046e (Fix conflicts in workflows) """Delete host record and associated CNAME records. All arguments passed to this function must match together a host record in @@ -655,8 +760,13 @@ def delete_host( "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) host_data = r.json() assert len(host_data) == 1, "Host to delete does not exist in IPAM." @@ -671,8 +781,13 @@ def delete_host( "view": dns_view, }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) cname_data = r.json() provided_cnames = [item + domain_name for item in cname_aliases] @@ -683,8 +798,13 @@ def delete_host( r = requests.delete( f"{wapi(infoblox_params)}/{host_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -694,12 +814,18 @@ def delete_host( r = requests.delete( f"{wapi(infoblox_params)}/{cname_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), +<<<<<<< HEAD verify=False, # noqa: S501 timeout=REQUESTS_TIMEOUT, +======= + verify=False, + timeout=REQUESTS_TIMEOUT +>>>>>>> 232046e (Fix conflicts in workflows) ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return host_addresses +<<<<<<< HEAD def validate_network( @@ -786,3 +912,5 @@ def validate_host( return host_addresses +======= +>>>>>>> 232046e (Fix conflicts in workflows) diff --git a/gso/workflows/device/create_device.py b/gso/workflows/device/create_device.py index 391b48b4a919e5a900c16c39d4e798e6227cc35b..2656c621b6755d39b40d75b2ef60fd5b87d2c71f 100644 --- a/gso/workflows/device/create_device.py +++ b/gso/workflows/device/create_device.py @@ -17,8 +17,8 @@ from gso.products.product_blocks import device as device_pb from gso.products.product_types import device from gso.products.product_types.device import DeviceInactive, DeviceProvisioning from gso.products.product_types.site import Site -from gso.services import ipam, provisioning_proxy from gso.services.provisioning_proxy import pp_interaction +from gso.services import ipam, provisioning_proxy def site_selector() -> Choice: diff --git a/gso/workflows/iptrunk/create_iptrunk.py b/gso/workflows/iptrunk/create_iptrunk.py index 2b70dc70a0562078c3fa7a08770b600d0e64bc42..271d2700f035ab337c8a35fffd145233d8a98e76 100644 --- a/gso/workflows/iptrunk/create_iptrunk.py +++ b/gso/workflows/iptrunk/create_iptrunk.py @@ -13,8 +13,8 @@ from gso.products.product_blocks import PhyPortCapacity from gso.products.product_blocks.iptrunk import IptrunkType from gso.products.product_types.device import Device from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning -from gso.services import ipam, provisioning_proxy from gso.services.provisioning_proxy import pp_interaction +from gso.services import ipam, provisioning_proxy def initial_input_form_generator(product_name: str) -> FormGenerator: diff --git a/test/test_ipam.py b/test/test_ipam.py index 3236c92f8cbb3a2d0c732d715e009124ed135c49..706c9d856d2e03ed86cd49acea181bf4a45fb9bf 100644 --- a/test/test_ipam.py +++ b/test/test_ipam.py @@ -1,466 +1,467 @@ -import ipaddress -import re -from os import PathLike - -import pytest -import responses - -from gso.services import ipam - - -@responses.activate -def test_allocate_networks(data_config_filename: PathLike): - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/network.*"), - json={ - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 - "network": "10.255.255.20/32", - }, - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/ipv6network.*"), - json={ - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - "network": "dead:beef::18/128", - }, - ) - - service_networks = ipam.allocate_networks(service_type="TRUNK") - assert service_networks == ipam.ServiceNetworks( - v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") - ) - - # should fail because this service type has networks instead of containers - with pytest.raises(AssertionError): - service_networks = ipam.allocate_networks(service_type="LO") - assert service_networks is None - - -@responses.activate -def test_allocate_host(data_config_filename: PathLike): - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/record:host$"), - json="record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20", # noqa: E501 - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.255.*"), - json=[ - { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 - "network": "10.255.255.20/32", - "network_view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.254.*"), - json=[ - { - "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501 - "network": "10.255.254.20/32", - "network_view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), - json=[ - { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - "network": "dead:beef::18/128", - "network_view": "default", - } - ], - ) - - responses.add(method=responses.GET, url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), json=[]) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$"), # noqa: E501 - json={"ips": ["10.255.255.20"]}, - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$"), # noqa: E501 - body="Cannot find 1 available IP address(es) in this network", - status=400, - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$"), # noqa: E501 - json={"ips": ["dead:beef::18"]}, - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/network.*_return_fields.*"), - json={ - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 - "network": "10.255.255.20/32", - }, - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/ipv6network.*_return_fields.*"), - json={ - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - "network": "dead:beef::18/128", - }, - ) - - # test host creation by IP addresses - service_hosts = ipam.allocate_host( - hostname="test", - service_type="TRUNK", - host_addresses=ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ), - ) - assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ) - - # test host creation by network addresses - service_hosts = ipam.allocate_host( - hostname="test", - service_type="TRUNK", - service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") - ), - ) - assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ) - - # test host creation by just service_type when service cfg uses networks - service_hosts = ipam.allocate_host(hostname="test", service_type="LO") - assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ) - - # test host creation by just service_type when service cfg uses containers - service_hosts = ipam.allocate_host(hostname="test", service_type="TRUNK") - assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ) - - # test host creation that should return a no available IP error - with pytest.raises(AssertionError): - service_hosts = ipam.allocate_host( - hostname="test", - service_type="TRUNK", - service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network("10.255.254.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") - ), - ) - assert service_hosts is None - - # test host creation that should return a network not exist error - with pytest.raises(AssertionError): - service_hosts = ipam.allocate_host( - hostname="test", - service_type="TRUNK", - service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("beef:dead::18/128") - ), - ) - assert service_hosts is None - - -@responses.activate -def test_delete_network(data_config_filename: PathLike): - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), - json=[ - { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 - "network": "10.255.255.0/26", - "network_view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.255.20.*"), - json=[ - { - "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 - "network": "100.255.255.20/32", - "network_view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), - json=[ - { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - "network": "dead:beef::18/128", - "network_view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), - json=[ - { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 - "network": "beef:dead::18/128", - "network_view": "default", - } - ], - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), - body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*/network.*100.255.255.*"), - body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), - body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), - body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 - ) - - service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO") - assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) - - with pytest.raises(AssertionError): - service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO") - assert service_network is None - - service_network = ipam.delete_network(network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK") - assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128")) - - with pytest.raises(AssertionError): - service_network = ipam.delete_network(network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK") - assert service_network is None - - -@responses.activate -def test_delete_host(data_config_filename: PathLike): - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*record:host.*"), - json=[ - { - "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 - "ipv4addrs": [ - { - "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 - "configure_for_dhcp": False, - "host": "ha_lo.gso", - "ipv4addr": "10.255.255.1", - } - ], - "ipv6addrs": [ - { - "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 - "configure_for_dhcp": False, - "host": "ha_lo.gso", - "ipv6addr": "dead:beef::1", - } - ], - "name": "ha_lo.gso", - "view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*record:cname.*"), - json=[ - { - "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 - "canonical": "hA_LO.lo", - "name": "alias1.ha.lo", - "view": "default", - }, - { - "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 - "canonical": "hA_LO.lo", - "name": "alias2.ha.lo", - "view": "default", - }, - ], - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*record:host.*"), - body="record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default", # noqa: E501 - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*record:cname.*"), - body="record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default", # noqa: E501 - ) - - input_host_addresses = ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - host_addresses = ipam.delete_host( - hostname="ha_lo", - host_addresses=input_host_addresses, - cname_aliases=["alias1.ha", "alias2.ha"], - service_type="LO", - ) - assert host_addresses == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - - # Fail because missing CNAME - with pytest.raises(AssertionError): - host_addresses = ipam.delete_host( - hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO" - ) - assert host_addresses is None - - # Fail because non-matching CNAME - with pytest.raises(AssertionError): - host_addresses = ipam.delete_host( - hostname="ha_lo", - host_addresses=input_host_addresses, - cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"], - service_type="LO", - ) - assert host_addresses is None - - -@responses.activate -def test_validate_network(data_config_filename: PathLike): - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), - json=[ - { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 - "network": "10.255.255.0/26", - "network_view": "default", - "comment": "the subscription id is 0123456789abcdef", - } - ], - ) - - service_network = ipam.validate_network( - gso_subscription_id="0123456789abcdef", network=ipam.ipaddress.ip_network("10.255.255.0/26") - ) - assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) - - # Fail because non-matching subscription id - with pytest.raises(AssertionError): - service_network = ipam.validate_network( - gso_subscription_id="1a2b3c4d5e6f7890", network=ipam.ipaddress.ip_network("10.255.255.0/26") - ) - assert service_network is None - - -@responses.activate -def test_validate_host(data_config_filename: PathLike): - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*record:host.*"), - json=[ - { - "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 - "ipv4addrs": [ - { - "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 - "configure_for_dhcp": False, - "host": "ha_lo.gso", - "ipv4addr": "10.255.255.1", - } - ], - "ipv6addrs": [ - { - "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 - "configure_for_dhcp": False, - "host": "ha_lo.gso", - "ipv6addr": "dead:beef::1", - } - ], - "name": "ha_lo.gso", - "view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*record:cname.*"), - json=[ - { - "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 - "canonical": "hA_LO.lo", - "name": "alias1.ha.lo", - "view": "default", - }, - { - "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 - "canonical": "hA_LO.lo", - "name": "alias2.ha.lo", - "view": "default", - }, - ], - ) - - input_host_addresses = ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - host_addresses = ipam.validate_host( - hostname="ha_lo", - host_addresses=input_host_addresses, - cname_aliases=["alias1.ha", "alias2.ha"], - service_type="LO", - ) - assert host_addresses == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - - # Fail because non-matching hostname - host_addresses = ipam.validate_host( - hostname="wrong_hostname", - host_addresses=input_host_addresses, - cname_aliases=["alias1.ha", "alias2.ha"], - service_type="LO", - ) - with pytest.raises(AssertionError): - host_addresses = ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - assert host_addresses is None +import ipaddress +import re +from os import PathLike + +import pytest +import responses + +from gso.services import ipam + + +@responses.activate +def test_allocate_networks(data_config_filename: PathLike): + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/network.*"), + json={ + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "network": "10.255.255.20/32", + }, + ) + + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/ipv6network.*"), + json={ + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + }, + ) + + service_networks = ipam.allocate_networks(service_type="TRUNK") + assert service_networks == ipam.ServiceNetworks( + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") + ) + + # should fail because this service type has networks instead of containers + with pytest.raises(AssertionError): + service_networks = ipam.allocate_networks(service_type="LO") + assert service_networks is None + + +@responses.activate +def test_allocate_host(data_config_filename: PathLike): + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/record:host$"), + json="record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20", # noqa: E501 + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.*"), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "network": "10.255.255.20/32", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.254.*"), + json=[ + { + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501 + "network": "10.255.254.20/32", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + "network_view": "default", + } + ], + ) + + responses.add(method=responses.GET, url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), json=[]) + + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$"), # noqa: E501 + json={"ips": ["10.255.255.20"]}, + ) + + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$"), # noqa: E501 + body="Cannot find 1 available IP address(es) in this network", + status=400, + ) + + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$"), # noqa: E501 + json={"ips": ["dead:beef::18"]}, + ) + + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/network.*_return_fields.*"), + json={ + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "network": "10.255.255.20/32", + }, + ) + + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/ipv6network.*_return_fields.*"), + json={ + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + }, + ) + + # test host creation by IP addresses + service_hosts = ipam.allocate_host( + hostname="test", + service_type="TRUNK", + host_addresses=ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ), + ) + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ) + + # test host creation by network addresses + service_hosts = ipam.allocate_host( + hostname="test", + service_type="TRUNK", + service_networks=ipam.ServiceNetworks( + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") + ), + ) + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ) + + # test host creation by just service_type when service cfg uses networks + service_hosts = ipam.allocate_host(hostname="test", service_type="LO") + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ) + + # test host creation by just service_type when service cfg uses containers + service_hosts = ipam.allocate_host(hostname="test", service_type="TRUNK") + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ) + + # test host creation that should return a no available IP error + with pytest.raises(AssertionError): + service_hosts = ipam.allocate_host( + hostname="test", + service_type="TRUNK", + service_networks=ipam.ServiceNetworks( + v4=ipaddress.ip_network("10.255.254.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") + ), + ) + assert service_hosts is None + + # test host creation that should return a network not exist error + with pytest.raises(AssertionError): + service_hosts = ipam.allocate_host( + hostname="test", + service_type="TRUNK", + service_networks=ipam.ServiceNetworks( + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("beef:dead::18/128") + ), + ) + assert service_hosts is None + + +@responses.activate +def test_delete_network(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + "network": "10.255.255.0/26", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.20.*"), + json=[ + { + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 + "network": "100.255.255.20/32", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 + "network": "beef:dead::18/128", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), + body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*/network.*100.255.255.*"), + body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 + ) + + service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO") + assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) + + with pytest.raises(AssertionError): + service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO") + assert service_network is None + + service_network = ipam.delete_network(network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK") + assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128")) + + with pytest.raises(AssertionError): + service_network = ipam.delete_network(network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK") + assert service_network is None + + +@responses.activate +def test_delete_host(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:host.*"), + json=[ + { + "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 + "ipv4addrs": [ + { + "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv4addr": "10.255.255.1", + } + ], + "ipv6addrs": [ + { + "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv6addr": "dead:beef::1", + } + ], + "name": "ha_lo.gso", + "view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:cname.*"), + json=[ + { + "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias1.ha.lo", + "view": "default", + }, + { + "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias2.ha.lo", + "view": "default", + }, + ], + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*record:host.*"), + body="record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default", # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*record:cname.*"), + body="record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default", # noqa: E501 + ) + + input_host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + host_addresses = ipam.delete_host( + hostname="ha_lo", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", + ) + assert host_addresses == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + + # Fail because missing CNAME + with pytest.raises(AssertionError): + host_addresses = ipam.delete_host( + hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO" + ) + assert host_addresses is None + + # Fail because non-matching CNAME + with pytest.raises(AssertionError): + host_addresses = ipam.delete_host( + hostname="ha_lo", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"], + service_type="LO", + ) + assert host_addresses is None + + +@responses.activate +def test_validate_network(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + "network": "10.255.255.0/26", + "network_view": "default", + "comment": "the subscription id is 0123456789abcdef", + } + ], + ) + + service_network = ipam.validate_network( + gso_subscription_id="0123456789abcdef", network=ipam.ipaddress.ip_network("10.255.255.0/26") + ) + assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) + + # Fail because non-matching subscription id + with pytest.raises(AssertionError): + service_network = ipam.validate_network( + gso_subscription_id="1a2b3c4d5e6f7890", network=ipam.ipaddress.ip_network("10.255.255.0/26") + ) + assert service_network is None + + +@responses.activate +def test_validate_host(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:host.*"), + json=[ + { + "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 + "ipv4addrs": [ + { + "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv4addr": "10.255.255.1", + } + ], + "ipv6addrs": [ + { + "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv6addr": "dead:beef::1", + } + ], + "name": "ha_lo.gso", + "view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:cname.*"), + json=[ + { + "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias1.ha.lo", + "view": "default", + }, + { + "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias2.ha.lo", + "view": "default", + }, + ], + ) + + input_host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + host_addresses = ipam.validate_host( + hostname="ha_lo", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", + ) + assert host_addresses == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + + # Fail because non-matching hostname + host_addresses = ipam.validate_host( + hostname="wrong_hostname", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", + ) + with pytest.raises(AssertionError): + host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + assert host_addresses is None +