diff --git a/gso/services/ipam.py b/gso/services/ipam.py index de7539e37f9a5d9ac40a7ff98062465d8bc4628b..706c9d856d2e03ed86cd49acea181bf4a45fb9bf 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -1,916 +1,467 @@ -# mypy: ignore-errors import ipaddress -from enum import Enum -from typing import Optional, Tuple, Union +import re +from os import PathLike -import requests -from pydantic import BaseSettings -from requests.auth import HTTPBasicAuth +import pytest +import responses -from gso import settings +from gso.services import ipam -class V4ServiceNetwork(BaseSettings): - v4: ipaddress.IPv4Network +@responses.activate +def test_allocate_networks(data_config_filename: PathLike): + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/network.*"), + json={ + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "network": "10.255.255.20/32", + }, + ) + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/ipv6network.*"), + json={ + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + }, + ) -class V6ServiceNetwork(BaseSettings): - v6: ipaddress.IPv6Network + service_networks = ipam.allocate_networks(service_type="TRUNK") + assert service_networks == ipam.ServiceNetworks( + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") + ) + # should fail because this service type has networks instead of containers + with pytest.raises(AssertionError): + service_networks = ipam.allocate_networks(service_type="LO") + assert service_networks is None -class ServiceNetworks(BaseSettings): - v4: ipaddress.IPv4Network - v6: ipaddress.IPv6Network +@responses.activate +def test_allocate_host(data_config_filename: PathLike): + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/record:host$"), + json="record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20", # noqa: E501 + ) -class V4HostAddress(BaseSettings): - v4: ipaddress.IPv4Address + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.*"), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "network": "10.255.255.20/32", + "network_view": "default", + } + ], + ) + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.254.*"), + json=[ + { + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501 + "network": "10.255.254.20/32", + "network_view": "default", + } + ], + ) -class V6HostAddress(BaseSettings): - v6: ipaddress.IPv6Address + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + "network_view": "default", + } + ], + ) + responses.add(method=responses.GET, url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), json=[]) -class HostAddresses(BaseSettings): - v4: ipaddress.IPv4Address - v6: ipaddress.IPv6Address + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$"), # noqa: E501 + json={"ips": ["10.255.255.20"]}, + ) + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$"), # noqa: E501 + body="Cannot find 1 available IP address(es) in this network", + status=400, + ) -class IPAMErrors(Enum): - # HTTP error code, match in error message - CONTAINER_FULL = 400, "Can not find requested number of networks" - NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network" - - -REQUESTS_TIMEOUT = 20 - + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$"), # noqa: E501 + json={"ips": ["dead:beef::18"]}, + ) -# TODO: remove this! -# lab infoblox cert isn't valid for the ipv4 address -# disable warnings for now -requests.packages.urllib3.disable_warnings() + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/network.*_return_fields.*"), + json={ + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "network": "10.255.255.20/32", + }, + ) + responses.add( + method=responses.POST, + url=re.compile(r".*/wapi.*/ipv6network.*_return_fields.*"), + json={ + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + }, + ) -def match_error_code(response, error_code): - return response.status_code == error_code.value[0] and error_code.value[1] in response.text + # test host creation by IP addresses + service_hosts = ipam.allocate_host( + hostname="test", + service_type="TRUNK", + host_addresses=ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ), + ) + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ) + # test host creation by network addresses + service_hosts = ipam.allocate_host( + hostname="test", + service_type="TRUNK", + service_networks=ipam.ServiceNetworks( + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") + ), + ) + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ) -def wapi(infoblox_params: settings.InfoBloxParams): - return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}" + # test host creation by just service_type when service cfg uses networks + service_hosts = ipam.allocate_host(hostname="test", service_type="LO") + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ) + # test host creation by just service_type when service cfg uses containers + service_hosts = ipam.allocate_host(hostname="test", service_type="TRUNK") + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ) -def ip_addr_version(addr: str = ""): - ip_version = None - ip_addr = ipaddress.ip_address(addr) - if isinstance(ip_addr, ipaddress.IPv4Address): - ip_version = 4 - elif isinstance(ip_addr, ipaddress.IPv6Address): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def ip_network_version(network: str = ""): - ip_version = None - ip_network = ipaddress.ip_network(network) - if isinstance(ip_network, ipaddress.IPv4Network): - ip_version = 4 - elif isinstance(ip_network, ipaddress.IPv6Network): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def assert_host_in_service( - ipv4_addr: str = "", - ipv6_addr: str = "", -<<<<<<< HEAD - oss_ipv4_containers=None, - oss_ipv6_containers=None, - oss_ipv4_networks=None, - oss_ipv6_networks=None, -======= - oss_ipv4_containers = None, - oss_ipv6_containers = None, - oss_ipv4_networks = None, - oss_ipv6_networks = None, ->>>>>>> 232046e (Fix conflicts in workflows) -): - # IPv4 - if oss_ipv4_containers: - assert any( - ipv4_addr in oss_ipv4_container for oss_ipv4_container in oss_ipv4_containers - ), "Host's IPv4 address doesn't belong to service type." - else: - assert any( - ipv4_addr in oss_ipv4_network for oss_ipv4_network in oss_ipv4_networks - ), "Host's IPv4 address doesn't belong to service type." - - # IPv6 - if oss_ipv6_containers: - assert any( - ipv6_addr in oss_ipv6_container for oss_ipv6_container in oss_ipv6_containers - ), "Host's IPv6 address doesn't belong to service type." - else: - assert any( - ipv6_addr in oss_ipv6_network for oss_ipv6_network in oss_ipv6_networks - ), "Host's IPv6 address doesn't belong to service type." - - -<<<<<<< HEAD -def assert_network_in_service( - ipv4_network: Optional[V4ServiceNetwork] = None, - ipv6_network: Optional[V6ServiceNetwork] = None, - oss_ipv4_containers=None, - oss_ipv6_containers=None, - oss_ipv4_networks=None, - oss_ipv6_networks=None, -): - # IPv4 - if ipv4_network: - if oss_ipv4_containers: - assert any( - ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers - ), "Network doesn't belong to service type." - else: - assert ipv4_network in oss_ipv4_networks, "Network doesn't belong to service type." - - # IPv6 - if ipv6_network: - if oss_ipv6_containers: - assert any( - ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers - ), "Network doesn't belong to service type." - else: - assert ipv6_network in oss_ipv6_networks, "Network doesn't belong to service type." - - -def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): - """Get all networks optinally filtering by a container or by a network. - - Args: - ---- - network_container (str, optional): container to filter by - network (str, optional): network to filter by - ip_version (int): 4 or 6 - - Returns: - ------- - (list) all found networks mathing the args, which may be empty. - -======= -def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): - """If network_container is not None, find all networks within the specified container. - Otherwise, if network is not None, find the specified network. - Otherwise find all networks. - A list of all found networks is returned (an HTTP 200 code - may be returned with an empty list). ->>>>>>> 232046e (Fix conflicts in workflows) - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = "network" if ip_version == 4 else "ipv6network" - params = None - if network_container: - params = {"network_container": network_container} - elif network: - params = {"network": network, "_return_fields": "comment"} - r = requests.get( - f"{wapi(infoblox_params)}/{endpoint}", - params=params, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def allocate_network_inner( - infoblox_params: settings.InfoBloxParams, - network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], - ip_version: int = 4, - comment: Optional[str] = "", - extattrs: Optional[dict] = None, -) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - if extattrs is None: - extattrs = {} - assert ip_version in [4, 6] - endpoint = "network" if ip_version == 4 else "ipv6network" - ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer" - - assert network_params.containers, ( - "No containers available to allocate networks for this service." - "Maybe you want to allocate a host from a network directly?" - ) - - # only return in the response the allocated network, not all available - # TODO: any validation needed for extrattrs wherever it's used? - req_payload = { - "network": { - "_object_function": "next_available_network", - "_parameters": {"cidr": network_params.mask}, - "_object": ip_container, - "_object_parameters": {"network": str(network_params.containers[0])}, - "_result_field": "networks", - }, - "comment": comment, - "extattrs": extattrs, - } - - container_index = 0 - while True: - r = requests.post( - f"{wapi(infoblox_params)}/{endpoint}", - params={"_return_fields": "network"}, - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - headers={"content-type": "application/json"}, -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) + # test host creation that should return a no available IP error + with pytest.raises(AssertionError): + service_hosts = ipam.allocate_host( + hostname="test", + service_type="TRUNK", + service_networks=ipam.ServiceNetworks( + v4=ipaddress.ip_network("10.255.254.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") + ), ) - if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): - break - # Container full: try with next valid container for service (if any) - container_index += 1 - if len(network_params.containers) < (container_index + 1): - break - req_payload["network"]["_object_parameters"]["network"] = str(network_params.containers[container_index]) - - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - assert "network" in r.json() - allocated_network = r.json()["network"] - if ip_version == 4: - return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) - return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) - - -<<<<<<< HEAD -def allocate_ipv4_network( - service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None -) -> V4ServiceNetwork: - """Allocate IPv4 network within the container of the specified service type. - - Args: - ---- - service_type (str): the name of the service type (e.g. "TRUNK") - comment (str, optional): a custom comment to write in the comment field in IPAM - extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) - - Returns: - ------- - (V4ServiceNetwork): the allocated network - - """ -======= -def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork: - """Allocate IPv4 network within the container of the specified service type.""" ->>>>>>> 232046e (Fix conflicts in workflows) - if extattrs is None: - extattrs = {} - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) - - -<<<<<<< HEAD -def allocate_ipv6_network( - service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None -) -> V6ServiceNetwork: - """Allocate IPv6 network within the container of the specified service type. - - Args: - ---- - service_type (str): the name of the service type (e.g. "TRUNK") - comment (str, optional): a custom comment to write in the comment field in IPAM - extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) - - Returns: - ------- - (V4ServiceNetwork): the allocated network - - """ -======= -def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork: - """Allocate IPv6 network within the container of the specified service type.""" ->>>>>>> 232046e (Fix conflicts in workflows) - if extattrs is None: - extattrs = {} - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) - - -def allocate_networks( -<<<<<<< HEAD - service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None -======= - service_type: str = "", - comment: Optional[str] = "", - extattrs: Optional[dict] = None ->>>>>>> 232046e (Fix conflicts in workflows) -) -> ServiceNetworks: - """Allocate IPv4 and IPv6 network for the specified service type.""" - if extattrs is None: - extattrs = {} -<<<<<<< HEAD - v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs) - v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs) -======= - v4_service_network = allocate_ipv4_network( - service_type=service_type, comment=comment, extattrs=extattrs - ) - v6_service_network = allocate_ipv6_network( - service_type=service_type, comment=comment, extattrs=extattrs - ) ->>>>>>> 232046e (Fix conflicts in workflows) - return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) - - -def find_next_available_ip(infoblox_params, network_ref: str = ""): - """Find the next available IP address from a network given its ref. - - Args: - ---- - infoblox_params (settings.InfoBloxParams): infoblox params - network_ref (str): the network to find the next available IP, in InfoBlox reference format - - Returns: - ------- - (str): next available IP in the network, or "NETWORK_FULL" if there's no space in the network - - """ - r = requests.post( - f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) - ) - - if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): - return "NETWORK_FULL" - - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert "ips" in r.json() - received_ip = r.json()["ips"] - assert len(received_ip) == 1 - return received_ip[0] - - -<<<<<<< HEAD -def allocate_host_inner( # noqa: C901 -======= -def allocate_host_inner( ->>>>>>> 232046e (Fix conflicts in workflows) - hostname: str = "", - addrs: Optional[Tuple] = None, - networks: Optional[Tuple] = None, - cname_aliases: Optional[list] = None, - dns_view: Optional[str] = "default", -<<<<<<< HEAD - extattrs: Optional[dict] = None, -======= - extattrs: Optional[dict] = None ->>>>>>> 232046e (Fix conflicts in workflows) -) -> Union[HostAddresses, str]: - # TODO: should hostnames be unique -<<<<<<< HEAD - # (fail if hostname already exists in this domain/service)? -======= - # (i.e. fail if hostname already exists in this domain/service)? ->>>>>>> 232046e (Fix conflicts in workflows) - if cname_aliases is None: - cname_aliases = [] - if extattrs is None: - extattrs = {} -<<<<<<< HEAD - assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host." -======= - assert addrs or networks, "You must specify either the host addresses or the networks CIDR." ->>>>>>> 232046e (Fix conflicts in workflows) - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - # If networks isn't None, allocate host in those networks. - if networks: - ipv4_network = networks[0] - ipv6_network = networks[1] - assert ip_network_version(ipv4_network) == 4 - assert ip_network_version(ipv6_network) == 6 - - # Find the next available IP address in each network - # If requested error doesn't exist, return error - network_info = find_networks(network=ipv4_network, ip_version=4) - if len(network_info) != 1: - return "IPV4_NETWORK_NOT_FOUND" - assert "_ref" in network_info[0] - ipv4_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"]) - - network_info = find_networks(network=ipv6_network, ip_version=6) - if len(network_info) != 1: - return "IPV6_NETWORK_NOT_FOUND" - assert "_ref" in network_info[0] - ipv6_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"]) - - # If couldn't find next available IPs, return error - if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": - if ipv4_addr == "NETWORK_FULL": - return "IPV4_NETWORK_FULL" - if ipv6_addr == "NETWORK_FULL": - return "IPV6_NETWORK_FULL" - - # Otherwise if addrs isn't None, allocate host with those addresses. - else: - ipv4_addr = addrs[0] - ipv6_addr = addrs[1] - assert ip_addr_version(ipv4_addr) == 4 - assert ip_addr_version(ipv6_addr) == 6 - - # hostname parameter must be full name including domain name. - req_payload = { - "ipv4addrs": [{"ipv4addr": ipv4_addr}], - "ipv6addrs": [{"ipv6addr": ipv6_addr}], - "name": hostname, - "configure_for_dns": True, - "view": dns_view, - "extattrs": extattrs, - } - - r = requests.post( - f"{wapi(infoblox_params)}/record:host", - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert isinstance(r.json(), str) - assert r.json().startswith("record:host/") - - if cname_aliases: - cname_req_payload = {"name": "", "canonical": hostname, "view": dns_view, "extattrs": extattrs} - - for alias in cname_aliases: - cname_req_payload["name"] = alias - r = requests.post( - f"{wapi(infoblox_params)}/record:cname", - json=cname_req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert r.json().startswith("record:cname/") - - return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) - - -<<<<<<< HEAD -def allocate_host( # noqa: C901 - hostname: str = "", - service_type: str = "", - service_networks: Optional[ServiceNetworks] = None, - host_addresses: Optional[HostAddresses] = None, -======= -def allocate_host( - hostname: str = "", - service_type: str = "", - service_networks: Optional[ServiceNetworks] = None, - host_addresses: Optional[ServiceNetworks] = None, ->>>>>>> 232046e (Fix conflicts in workflows) - cname_aliases: Optional[list] = None, - extattrs: Optional[dict] = None, -) -> HostAddresses: - """Allocate host record with both IPv4 and IPv6 address, and respective DNS A and AAAA records. - - Args: - ---- - hostname (str): hostname of the host (without domain name, which is taken from the service type) - service_type (str): the name of the service type (e.g. "TRUNK") - service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host - host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence) - cname_aliases (list, optional): to create CNAME records in addition to the host record - extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) - - Returns: - ------- - (HostAddresses): ipv4 and ipv6 addresses of the allocated host - - """ - if cname_aliases is None: - cname_aliases = [] - if extattrs is None: - extattrs = {} - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - domain_name = getattr(ipam_params, service_type).domain_name - dns_view = getattr(ipam_params, service_type).dns_view - - assert (oss_ipv4_containers and oss_ipv6_containers) or ( - oss_ipv4_networks and oss_ipv6_networks - ), "This service is missing either containers or networks configuration." - assert domain_name, "This service is missing domain_name configuration." - assert dns_view, "This service is missing dns_view configuration." - - if cname_aliases: - cname_aliases = [alias + domain_name for alias in cname_aliases] - - # When neither service_networks not host_addresses are provided: - # If service has configured containers, new ipv4 and ipv6 networks are created and those are used. - # Note that in this case extattrs is for the hosts and not for the networks. - # If service doesn't have configured containers and has configured networks instead, the configured - # networks are used (they are filled up in order of appearance in the configuration file). - if not service_networks and not host_addresses: - if oss_ipv4_containers and oss_ipv6_containers: - # This service has configured containers. - # Use them to allocate new networks that can allocate the hosts. - - # IPv4 - ipv4_network = str(allocate_ipv4_network(service_type=service_type).v4) - assert ipv4_network, "No available space for IPv4 networks for this service type." - - # IPv6 - ipv6_network = str(allocate_ipv6_network(service_type=service_type).v6) - assert ipv6_network, "No available space for IPv6 networks for this service type." - - elif oss_ipv4_networks and oss_ipv6_networks: - # This service has configured networks. - # Allocate a host inside an ipv4 and ipv6 network from among them. - ipv4_network = str(oss_ipv4_networks[0]) - ipv6_network = str(oss_ipv6_networks[0]) - - ipv4_network_index = 0 - ipv6_network_index = 0 - while True: - network_tuple = (ipv4_network, ipv6_network) - host = allocate_host_inner( - hostname=hostname + domain_name, - networks=network_tuple, - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, - ) - - if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: - break - if "IPV4" in host: - ipv4_network_index += 1 - assert oss_ipv4_networks, "No available space in any IPv4 network for this service." - assert ipv4_network_index < len( - oss_ipv4_networks - ), "No available space in any IPv4 network for this service." - ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) - else: # "IPV6" in host - ipv6_network_index += 1 - assert oss_ipv6_networks, "No available space in any IPv6 network for this service." - assert ipv6_network_index < len( - oss_ipv6_networks - ), "No available space in any IPv6 network for this service." - ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) - - elif service_networks: - ipv4_network = service_networks.v4 - ipv6_network = service_networks.v6 - assert_network_in_service( - ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks + assert service_hosts is None + + # test host creation that should return a network not exist error + with pytest.raises(AssertionError): + service_hosts = ipam.allocate_host( + hostname="test", + service_type="TRUNK", + service_networks=ipam.ServiceNetworks( + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("beef:dead::18/128") + ), ) + assert service_hosts is None + + +@responses.activate +def test_delete_network(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + "network": "10.255.255.0/26", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.20.*"), + json=[ + { + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 + "network": "100.255.255.20/32", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 + "network": "beef:dead::18/128", + "network_view": "default", + } + ], + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), + body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*/network.*100.255.255.*"), + body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 + ) + + service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO") + assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) + + with pytest.raises(AssertionError): + service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO") + assert service_network is None + + service_network = ipam.delete_network(network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK") + assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128")) + + with pytest.raises(AssertionError): + service_network = ipam.delete_network(network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK") + assert service_network is None + + +@responses.activate +def test_delete_host(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:host.*"), + json=[ + { + "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 + "ipv4addrs": [ + { + "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv4addr": "10.255.255.1", + } + ], + "ipv6addrs": [ + { + "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv6addr": "dead:beef::1", + } + ], + "name": "ha_lo.gso", + "view": "default", + } + ], + ) + + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:cname.*"), + json=[ + { + "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias1.ha.lo", + "view": "default", + }, + { + "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias2.ha.lo", + "view": "default", + }, + ], + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*record:host.*"), + body="record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default", # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r".*/wapi.*record:cname.*"), + body="record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default", # noqa: E501 + ) + + input_host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + host_addresses = ipam.delete_host( + hostname="ha_lo", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", + ) + assert host_addresses == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) - host = allocate_host_inner( - hostname=hostname + domain_name, - networks=(str(ipv4_network), str(ipv6_network)), - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, + # Fail because missing CNAME + with pytest.raises(AssertionError): + host_addresses = ipam.delete_host( + hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO" ) - assert "NETWORK_FULL" not in host, "Network is full." - assert "NETWORK_NOT_FOUND" not in host, "Network does not exist in IPAM. Create it first." - - elif host_addresses: - ipv4_addr = host_addresses.v4 - ipv6_addr = host_addresses.v6 - assert_host_in_service( - ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks + assert host_addresses is None + + # Fail because non-matching CNAME + with pytest.raises(AssertionError): + host_addresses = ipam.delete_host( + hostname="ha_lo", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"], + service_type="LO", ) + assert host_addresses is None + + +@responses.activate +def test_validate_network(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + "network": "10.255.255.0/26", + "network_view": "default", + "comment": "the subscription id is 0123456789abcdef", + } + ], + ) - host = allocate_host_inner( - hostname=hostname + domain_name, - addrs=(str(ipv4_addr), str(ipv6_addr)), - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, - ) - assert "NETWORK_FULL" not in host - - return host - - -def delete_network( -<<<<<<< HEAD - network: ipaddress.ip_network = None, service_type: str = "" -======= - network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, - service_type: str = "" ->>>>>>> 232046e (Fix conflicts in workflows) -) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - """Delete IPv4 or IPv6 network by CIDR.""" - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert ipam_params.INFOBLOX - infoblox_params = ipam_params.INFOBLOX - - assert network, "No network specified to delete." - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - - ip_version = ip_network_version(str(network)) - - # Ensure that the network to be deleted is under the service type. - # Otherwise user isn't allowed to delete it - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - ipv4_network = None - ipv6_network = None - if ip_version == 4: - ipv4_network = network - else: - ipv6_network = network - assert_network_in_service( - ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks - ) - - network_info = find_networks(network=str(network), ip_version=ip_version) - assert len(network_info) == 1, "Network to delete does not exist in IPAM." - assert "_ref" in network_info[0], "Network to delete does not exist in IPAM." - - r = requests.delete( - f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Extract ipv4/ipv6 address from the network reference obtained in the - # response - r_text = r.text - network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) - if ip_version == 4: - return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) - return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) - - -def delete_host( - hostname: str = "", - host_addresses: HostAddresses = None, - cname_aliases: Optional[list] = None, -<<<<<<< HEAD - service_type: str = "", -) -> HostAddresses: -======= - service_type: str = "" -) -> Union[V4HostAddress, V6HostAddress]: ->>>>>>> 232046e (Fix conflicts in workflows) - """Delete host record and associated CNAME records. - - All arguments passed to this function must match together a host record in - IPAM, and all CNAME records associated to it must also be passed exactly. - """ - if cname_aliases is None: - cname_aliases = [] - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert ipam_params.INFOBLOX - infoblox_params = ipam_params.INFOBLOX - - assert host_addresses, "No host specified to delete." - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - domain_name = getattr(ipam_params, service_type).domain_name - dns_view = getattr(ipam_params, service_type).dns_view - ipv4_addr = str(host_addresses.v4) - ipv6_addr = str(host_addresses.v6) - - assert_host_in_service( - host_addresses.v4, - host_addresses.v6, - oss_ipv4_containers, - oss_ipv6_containers, - oss_ipv4_networks, - oss_ipv6_networks, - ) - - # Find host record reference - r = requests.get( - f"{wapi(infoblox_params)}/record:host", - params={ - "name": (hostname + domain_name).lower(), # hostnames are lowercase - "ipv4addr": ipv4_addr, - "ipv6addr": ipv6_addr, - "view": dns_view, - }, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) - ) - host_data = r.json() - assert len(host_data) == 1, "Host to delete does not exist in IPAM." - assert "_ref" in host_data[0], "Host to delete does not exist in IPAM." - host_ref = host_data[0]["_ref"] - - # Find CNAME records reference - r = requests.get( - f"{wapi(infoblox_params)}/record:cname", - params={ - "canonical": hostname + domain_name, - "view": dns_view, - }, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) - ) - cname_data = r.json() - provided_cnames = [item + domain_name for item in cname_aliases] - found_cnames = [item["name"] for item in cname_data if "name" in item] - assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." - - # Delete the host record - r = requests.delete( - f"{wapi(infoblox_params)}/{host_ref}", - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Delete the CNAME records - cname_refs = [item["_ref"] for item in cname_data if "name" in item] - for cname_ref in cname_refs: - r = requests.delete( - f"{wapi(infoblox_params)}/{cname_ref}", - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), -<<<<<<< HEAD - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, -======= - verify=False, - timeout=REQUESTS_TIMEOUT ->>>>>>> 232046e (Fix conflicts in workflows) + service_network = ipam.validate_network( + gso_subscription_id="0123456789abcdef", network=ipam.ipaddress.ip_network("10.255.255.0/26") + ) + assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) + + # Fail because non-matching subscription id + with pytest.raises(AssertionError): + service_network = ipam.validate_network( + gso_subscription_id="1a2b3c4d5e6f7890", network=ipam.ipaddress.ip_network("10.255.255.0/26") ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - return host_addresses -<<<<<<< HEAD - - -def validate_network( - gso_subscription_id: str = "", network: ipaddress.ip_network = None -) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - """Validate IPv4 or IPv6 network. - - Check if the specified network exist, and, if it does, check if its comment field contains gso_subscription_id. - Returns the network if validation successful. - """ - assert network, "No network specified to validate." - - ip_version = ip_network_version(str(network)) - network_info = find_networks(network=str(network), ip_version=ip_version) - assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM." - assert "comment" in network_info[0], "Network to validate does not have comment in IPAM." - assert ( - gso_subscription_id in network_info[0]["comment"] - ), "GSO subscription ID does not match the one in the comment field of the IPAM network." - - if ip_version == 4: - return V4ServiceNetwork(v4=network) - return V6ServiceNetwork(v6=network) - - -def validate_host( - hostname: str = "", - host_addresses: HostAddresses = None, - cname_aliases: Optional[list] = None, - service_type: str = "", -) -> HostAddresses: - """Validate host. - - Check if all arguments passed to this function match together a host record in - IPAM, and all CNAME records associated to it also match exactly. - Returns the host if validation successful. - """ - if cname_aliases is None: - cname_aliases = [] - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert ipam_params.INFOBLOX - infoblox_params = ipam_params.INFOBLOX - - assert hostname and host_addresses, "No host specified to validate. Either hostname or host_addresses missing." - domain_name = getattr(ipam_params, service_type).domain_name - ipv4_addr = str(host_addresses.v4) - ipv6_addr = str(host_addresses.v6) - dns_view = getattr(ipam_params, service_type).dns_view - - # Find host record reference - r = requests.get( - f"{wapi(infoblox_params)}/record:host", - params={ - "name": (hostname + domain_name).lower(), # hostnames are lowercase - "ipv4addr": ipv4_addr, - "ipv6addr": ipv6_addr, - "view": dns_view, - }, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, - ) - host_data = r.json() - assert len(host_data) == 1, "Host to validate does not exist in IPAM." - assert "_ref" in host_data[0], "Host to validate does not exist in IPAM." - - # Find CNAME records reference - r = requests.get( - f"{wapi(infoblox_params)}/record:cname", - params={ - "canonical": hostname + domain_name, - "view": dns_view, - }, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, # noqa: S501 - timeout=REQUESTS_TIMEOUT, + assert service_network is None + + +@responses.activate +def test_validate_host(data_config_filename: PathLike): + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:host.*"), + json=[ + { + "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 + "ipv4addrs": [ + { + "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv4addr": "10.255.255.1", + } + ], + "ipv6addrs": [ + { + "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv6addr": "dead:beef::1", + } + ], + "name": "ha_lo.gso", + "view": "default", + } + ], ) - cname_data = r.json() - provided_cnames = [item + domain_name for item in cname_aliases] - found_cnames = [item["name"] for item in cname_data if "name" in item] - assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." - return host_addresses + responses.add( + method=responses.GET, + url=re.compile(r".*/wapi.*record:cname.*"), + json=[ + { + "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias1.ha.lo", + "view": "default", + }, + { + "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias2.ha.lo", + "view": "default", + }, + ], + ) + + input_host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + host_addresses = ipam.validate_host( + hostname="ha_lo", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", + ) + assert host_addresses == ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + + # Fail because non-matching hostname + host_addresses = ipam.validate_host( + hostname="wrong_hostname", + host_addresses=input_host_addresses, + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", + ) + with pytest.raises(AssertionError): + host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + ) + assert host_addresses is None -======= ->>>>>>> 232046e (Fix conflicts in workflows)