diff --git a/gso/services/ipam.py b/gso/services/ipam.py index 706c9d856d2e03ed86cd49acea181bf4a45fb9bf..eacb46a80e08f6c8356810e3afcac92776afb4f0 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -1,467 +1,788 @@ +# mypy: ignore-errors import ipaddress -import re -from os import PathLike +from enum import Enum +from typing import Optional, Tuple, Union -import pytest -import responses +import requests +from pydantic import BaseSettings +from requests.auth import HTTPBasicAuth -from gso.services import ipam +from gso import settings -@responses.activate -def test_allocate_networks(data_config_filename: PathLike): - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/network.*"), - json={ - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 - "network": "10.255.255.20/32", - }, - ) +class V4ServiceNetwork(BaseSettings): + v4: ipaddress.IPv4Network - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/ipv6network.*"), - json={ - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - "network": "dead:beef::18/128", - }, - ) - service_networks = ipam.allocate_networks(service_type="TRUNK") - assert service_networks == ipam.ServiceNetworks( - v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") - ) +class V6ServiceNetwork(BaseSettings): + v6: ipaddress.IPv6Network - # should fail because this service type has networks instead of containers - with pytest.raises(AssertionError): - service_networks = ipam.allocate_networks(service_type="LO") - assert service_networks is None +class ServiceNetworks(BaseSettings): + v4: ipaddress.IPv4Network + v6: ipaddress.IPv6Network -@responses.activate -def test_allocate_host(data_config_filename: PathLike): - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/record:host$"), - json="record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20", # noqa: E501 - ) - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.255.*"), - json=[ - { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 - "network": "10.255.255.20/32", - "network_view": "default", - } - ], - ) +class V4HostAddress(BaseSettings): + v4: ipaddress.IPv4Address - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.254.*"), - json=[ - { - "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501 - "network": "10.255.254.20/32", - "network_view": "default", - } - ], - ) - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), - json=[ - { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - "network": "dead:beef::18/128", - "network_view": "default", - } - ], - ) +class V6HostAddress(BaseSettings): + v6: ipaddress.IPv6Address - responses.add(method=responses.GET, url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), json=[]) - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$"), # noqa: E501 - json={"ips": ["10.255.255.20"]}, - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$"), # noqa: E501 - body="Cannot find 1 available IP address(es) in this network", - status=400, - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$"), # noqa: E501 - json={"ips": ["dead:beef::18"]}, - ) - - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/network.*_return_fields.*"), - json={ - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 - "network": "10.255.255.20/32", - }, - ) +class HostAddresses(BaseSettings): + v4: ipaddress.IPv4Address + v6: ipaddress.IPv6Address - responses.add( - method=responses.POST, - url=re.compile(r".*/wapi.*/ipv6network.*_return_fields.*"), - json={ - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - "network": "dead:beef::18/128", - }, - ) - # test host creation by IP addresses - service_hosts = ipam.allocate_host( - hostname="test", - service_type="TRUNK", - host_addresses=ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ), - ) - assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ) +class IPAMErrors(Enum): + # HTTP error code, match in error message + CONTAINER_FULL = 400, "Can not find requested number of networks" + NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network" - # test host creation by network addresses - service_hosts = ipam.allocate_host( - hostname="test", - service_type="TRUNK", - service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") - ), - ) - assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ) - # test host creation by just service_type when service cfg uses networks - service_hosts = ipam.allocate_host(hostname="test", service_type="LO") - assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ) +REQUESTS_TIMEOUT = 20 - # test host creation by just service_type when service cfg uses containers - service_hosts = ipam.allocate_host(hostname="test", service_type="TRUNK") - assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") - ) - # test host creation that should return a no available IP error - with pytest.raises(AssertionError): - service_hosts = ipam.allocate_host( - hostname="test", - service_type="TRUNK", - service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network("10.255.254.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") - ), - ) - assert service_hosts is None - - # test host creation that should return a network not exist error - with pytest.raises(AssertionError): - service_hosts = ipam.allocate_host( - hostname="test", - service_type="TRUNK", - service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("beef:dead::18/128") - ), - ) - assert service_hosts is None - - -@responses.activate -def test_delete_network(data_config_filename: PathLike): - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), - json=[ - { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 - "network": "10.255.255.0/26", - "network_view": "default", - } - ], - ) +# TODO: remove this! +# lab infoblox cert isn't valid for the ipv4 address +# disable warnings for now +requests.packages.urllib3.disable_warnings() - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.255.20.*"), - json=[ - { - "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 - "network": "100.255.255.20/32", - "network_view": "default", - } - ], - ) - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), - json=[ - { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - "network": "dead:beef::18/128", - "network_view": "default", - } - ], - ) +def match_error_code(response, error_code): + return response.status_code == error_code.value[0] and error_code.value[1] in response.text - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), - json=[ - { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 - "network": "beef:dead::18/128", - "network_view": "default", - } - ], - ) - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), - body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 - ) +def wapi(infoblox_params: settings.InfoBloxParams): + return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}" - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*/network.*100.255.255.*"), - body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 - ) - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), - body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), - body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 - ) - - service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO") - assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) - - with pytest.raises(AssertionError): - service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO") - assert service_network is None - - service_network = ipam.delete_network(network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK") - assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128")) - - with pytest.raises(AssertionError): - service_network = ipam.delete_network(network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK") - assert service_network is None - - -@responses.activate -def test_delete_host(data_config_filename: PathLike): - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*record:host.*"), - json=[ - { - "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 - "ipv4addrs": [ - { - "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 - "configure_for_dhcp": False, - "host": "ha_lo.gso", - "ipv4addr": "10.255.255.1", - } - ], - "ipv6addrs": [ - { - "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 - "configure_for_dhcp": False, - "host": "ha_lo.gso", - "ipv6addr": "dead:beef::1", - } - ], - "name": "ha_lo.gso", - "view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*record:cname.*"), - json=[ - { - "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 - "canonical": "hA_LO.lo", - "name": "alias1.ha.lo", - "view": "default", - }, - { - "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 - "canonical": "hA_LO.lo", - "name": "alias2.ha.lo", - "view": "default", - }, - ], - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*record:host.*"), - body="record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default", # noqa: E501 - ) - - responses.add( - method=responses.DELETE, - url=re.compile(r".*/wapi.*record:cname.*"), - body="record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default", # noqa: E501 - ) - - input_host_addresses = ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - host_addresses = ipam.delete_host( - hostname="ha_lo", - host_addresses=input_host_addresses, - cname_aliases=["alias1.ha", "alias2.ha"], - service_type="LO", - ) - assert host_addresses == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - - # Fail because missing CNAME - with pytest.raises(AssertionError): - host_addresses = ipam.delete_host( - hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO" +def ip_addr_version(addr: str = ""): + ip_version = None + ip_addr = ipaddress.ip_address(addr) + if isinstance(ip_addr, ipaddress.IPv4Address): + ip_version = 4 + elif isinstance(ip_addr, ipaddress.IPv6Address): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + + +def ip_network_version(network: str = ""): + ip_version = None + ip_network = ipaddress.ip_network(network) + if isinstance(ip_network, ipaddress.IPv4Network): + ip_version = 4 + elif isinstance(ip_network, ipaddress.IPv6Network): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + + +def assert_host_in_service( + ipv4_addr: str = "", + ipv6_addr: str = "", + oss_ipv4_containers=None, + oss_ipv6_containers=None, + oss_ipv4_networks=None, + oss_ipv6_networks=None, +): + # IPv4 + if oss_ipv4_containers: + assert any( + ipv4_addr in oss_ipv4_container for oss_ipv4_container in oss_ipv4_containers + ), "Host's IPv4 address doesn't belong to service type." + else: + assert any( + ipv4_addr in oss_ipv4_network for oss_ipv4_network in oss_ipv4_networks + ), "Host's IPv4 address doesn't belong to service type." + + # IPv6 + if oss_ipv6_containers: + assert any( + ipv6_addr in oss_ipv6_container for oss_ipv6_container in oss_ipv6_containers + ), "Host's IPv6 address doesn't belong to service type." + else: + assert any( + ipv6_addr in oss_ipv6_network for oss_ipv6_network in oss_ipv6_networks + ), "Host's IPv6 address doesn't belong to service type." + + +def assert_network_in_service( + ipv4_network: Optional[V4ServiceNetwork] = None, + ipv6_network: Optional[V6ServiceNetwork] = None, + oss_ipv4_containers=None, + oss_ipv6_containers=None, + oss_ipv4_networks=None, + oss_ipv6_networks=None, +): + # IPv4 + if ipv4_network: + if oss_ipv4_containers: + assert any( + ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers + ), "Network doesn't belong to service type." + else: + assert ipv4_network in oss_ipv4_networks, "Network doesn't belong to service type." + + # IPv6 + if ipv6_network: + if oss_ipv6_containers: + assert any( + ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers + ), "Network doesn't belong to service type." + else: + assert ipv6_network in oss_ipv6_networks, "Network doesn't belong to service type." + + +def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): + """Get all networks optinally filtering by a container or by a network. + + Args: + ---- + network_container (str, optional): container to filter by + network (str, optional): network to filter by + ip_version (int): 4 or 6 + + Returns: + ------- + (list) all found networks mathing the args, which may be empty. + + """ + assert ip_version in [4, 6] + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + endpoint = "network" if ip_version == 4 else "ipv6network" + params = None + if network_container: + params = {"network_container": network_container} + elif network: + params = {"network": network, "_return_fields": "comment"} + r = requests.get( + f"{wapi(infoblox_params)}/{endpoint}", + params=params, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + + +def allocate_network_inner( + infoblox_params: settings.InfoBloxParams, + network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], + ip_version: int = 4, + comment: Optional[str] = "", + extattrs: Optional[dict] = None, +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + if extattrs is None: + extattrs = {} + assert ip_version in [4, 6] + endpoint = "network" if ip_version == 4 else "ipv6network" + ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer" + + assert network_params.containers, ( + "No containers available to allocate networks for this service." + "Maybe you want to allocate a host from a network directly?" + ) + + # only return in the response the allocated network, not all available + # TODO: any validation needed for extrattrs wherever it's used? + req_payload = { + "network": { + "_object_function": "next_available_network", + "_parameters": {"cidr": network_params.mask}, + "_object": ip_container, + "_object_parameters": {"network": str(network_params.containers[0])}, + "_result_field": "networks", + }, + "comment": comment, + "extattrs": extattrs, + } + + container_index = 0 + while True: + r = requests.post( + f"{wapi(infoblox_params)}/{endpoint}", + params={"_return_fields": "network"}, + json=req_payload, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + headers={"content-type": "application/json"}, + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, ) - assert host_addresses is None - - # Fail because non-matching CNAME - with pytest.raises(AssertionError): - host_addresses = ipam.delete_host( - hostname="ha_lo", - host_addresses=input_host_addresses, - cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"], - service_type="LO", + if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): + break + # Container full: try with next valid container for service (if any) + container_index += 1 + if len(network_params.containers) < (container_index + 1): + break + req_payload["network"]["_object_parameters"]["network"] = str(network_params.containers[container_index]) + + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + assert "network" in r.json() + allocated_network = r.json()["network"] + if ip_version == 4: + return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) + return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) + + +def allocate_ipv4_network( + service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None +) -> V4ServiceNetwork: + """Allocate IPv4 network within the container of the specified service type. + + Args: + ---- + service_type (str): the name of the service type (e.g. "TRUNK") + comment (str, optional): a custom comment to write in the comment field in IPAM + extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) + + Returns: + ------- + (V4ServiceNetwork): the allocated network + + """ + if extattrs is None: + extattrs = {} + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) + + +def allocate_ipv6_network( + service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None +) -> V6ServiceNetwork: + """Allocate IPv6 network within the container of the specified service type. + + Args: + ---- + service_type (str): the name of the service type (e.g. "TRUNK") + comment (str, optional): a custom comment to write in the comment field in IPAM + extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) + + Returns: + ------- + (V4ServiceNetwork): the allocated network + + """ + if extattrs is None: + extattrs = {} + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) + + +def allocate_networks( + service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None +) -> ServiceNetworks: + """Allocate IPv4 and IPv6 network for the specified service type.""" + if extattrs is None: + extattrs = {} + v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs) + v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs) + return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) + + +def find_next_available_ip(infoblox_params, network_ref: str = ""): + """Find the next available IP address from a network given its ref. + + Args: + ---- + infoblox_params (settings.InfoBloxParams): infoblox params + network_ref (str): the network to find the next available IP, in InfoBlox reference format + + Returns: + ------- + (str): next available IP in the network, or "NETWORK_FULL" if there's no space in the network + + """ + r = requests.post( + f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + + if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): + return "NETWORK_FULL" + + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert "ips" in r.json() + received_ip = r.json()["ips"] + assert len(received_ip) == 1 + return received_ip[0] + + +def allocate_host_inner( # noqa: C901 + hostname: str = "", + addrs: Optional[Tuple] = None, + networks: Optional[Tuple] = None, + cname_aliases: Optional[list] = None, + dns_view: Optional[str] = "default", + extattrs: Optional[dict] = None, +) -> Union[HostAddresses, str]: + # TODO: should hostnames be unique + # (fail if hostname already exists in this domain/service)? + if cname_aliases is None: + cname_aliases = [] + if extattrs is None: + extattrs = {} + assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host." + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + # If networks isn't None, allocate host in those networks. + if networks: + ipv4_network = networks[0] + ipv6_network = networks[1] + assert ip_network_version(ipv4_network) == 4 + assert ip_network_version(ipv6_network) == 6 + + # Find the next available IP address in each network + # If requested error doesn't exist, return error + network_info = find_networks(network=ipv4_network, ip_version=4) + if len(network_info) != 1: + return "IPV4_NETWORK_NOT_FOUND" + assert "_ref" in network_info[0] + ipv4_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"]) + + network_info = find_networks(network=ipv6_network, ip_version=6) + if len(network_info) != 1: + return "IPV6_NETWORK_NOT_FOUND" + assert "_ref" in network_info[0] + ipv6_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"]) + + # If couldn't find next available IPs, return error + if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": + if ipv4_addr == "NETWORK_FULL": + return "IPV4_NETWORK_FULL" + if ipv6_addr == "NETWORK_FULL": + return "IPV6_NETWORK_FULL" + + # Otherwise if addrs isn't None, allocate host with those addresses. + else: + ipv4_addr = addrs[0] + ipv6_addr = addrs[1] + assert ip_addr_version(ipv4_addr) == 4 + assert ip_addr_version(ipv6_addr) == 6 + + # hostname parameter must be full name including domain name. + req_payload = { + "ipv4addrs": [{"ipv4addr": ipv4_addr}], + "ipv6addrs": [{"ipv6addr": ipv6_addr}], + "name": hostname, + "configure_for_dns": True, + "view": dns_view, + "extattrs": extattrs, + } + + r = requests.post( + f"{wapi(infoblox_params)}/record:host", + json=req_payload, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert isinstance(r.json(), str) + assert r.json().startswith("record:host/") + + if cname_aliases: + cname_req_payload = {"name": "", "canonical": hostname, "view": dns_view, "extattrs": extattrs} + + for alias in cname_aliases: + cname_req_payload["name"] = alias + r = requests.post( + f"{wapi(infoblox_params)}/record:cname", + json=cname_req_payload, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.json().startswith("record:cname/") + + return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) + + +def allocate_host( # noqa: C901 + hostname: str = "", + service_type: str = "", + service_networks: Optional[ServiceNetworks] = None, + host_addresses: Optional[HostAddresses] = None, + cname_aliases: Optional[list] = None, + extattrs: Optional[dict] = None, +) -> HostAddresses: + """Allocate host record with both IPv4 and IPv6 address, and respective DNS A and AAAA records. + + Args: + ---- + hostname (str): hostname of the host (without domain name, which is taken from the service type) + service_type (str): the name of the service type (e.g. "TRUNK") + service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host + host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence) + cname_aliases (list, optional): to create CNAME records in addition to the host record + extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"}) + + Returns: + ------- + (HostAddresses): ipv4 and ipv6 addresses of the allocated host + + """ + if cname_aliases is None: + cname_aliases = [] + if extattrs is None: + extattrs = {} + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + domain_name = getattr(ipam_params, service_type).domain_name + dns_view = getattr(ipam_params, service_type).dns_view + + assert (oss_ipv4_containers and oss_ipv6_containers) or ( + oss_ipv4_networks and oss_ipv6_networks + ), "This service is missing either containers or networks configuration." + assert domain_name, "This service is missing domain_name configuration." + assert dns_view, "This service is missing dns_view configuration." + + if cname_aliases: + cname_aliases = [alias + domain_name for alias in cname_aliases] + + # When neither service_networks not host_addresses are provided: + # If service has configured containers, new ipv4 and ipv6 networks are created and those are used. + # Note that in this case extattrs is for the hosts and not for the networks. + # If service doesn't have configured containers and has configured networks instead, the configured + # networks are used (they are filled up in order of appearance in the configuration file). + if not service_networks and not host_addresses: + if oss_ipv4_containers and oss_ipv6_containers: + # This service has configured containers. + # Use them to allocate new networks that can allocate the hosts. + + # IPv4 + ipv4_network = str(allocate_ipv4_network(service_type=service_type).v4) + assert ipv4_network, "No available space for IPv4 networks for this service type." + + # IPv6 + ipv6_network = str(allocate_ipv6_network(service_type=service_type).v6) + assert ipv6_network, "No available space for IPv6 networks for this service type." + + elif oss_ipv4_networks and oss_ipv6_networks: + # This service has configured networks. + # Allocate a host inside an ipv4 and ipv6 network from among them. + ipv4_network = str(oss_ipv4_networks[0]) + ipv6_network = str(oss_ipv6_networks[0]) + + ipv4_network_index = 0 + ipv6_network_index = 0 + while True: + network_tuple = (ipv4_network, ipv6_network) + host = allocate_host_inner( + hostname=hostname + domain_name, + networks=network_tuple, + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs, + ) + + if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: + break + if "IPV4" in host: + ipv4_network_index += 1 + assert oss_ipv4_networks, "No available space in any IPv4 network for this service." + assert ipv4_network_index < len( + oss_ipv4_networks + ), "No available space in any IPv4 network for this service." + ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) + else: # "IPV6" in host + ipv6_network_index += 1 + assert oss_ipv6_networks, "No available space in any IPv6 network for this service." + assert ipv6_network_index < len( + oss_ipv6_networks + ), "No available space in any IPv6 network for this service." + ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) + + elif service_networks: + ipv4_network = service_networks.v4 + ipv6_network = service_networks.v6 + assert_network_in_service( + ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks ) - assert host_addresses is None - - -@responses.activate -def test_validate_network(data_config_filename: PathLike): - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), - json=[ - { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 - "network": "10.255.255.0/26", - "network_view": "default", - "comment": "the subscription id is 0123456789abcdef", - } - ], - ) - service_network = ipam.validate_network( - gso_subscription_id="0123456789abcdef", network=ipam.ipaddress.ip_network("10.255.255.0/26") - ) - assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) - - # Fail because non-matching subscription id - with pytest.raises(AssertionError): - service_network = ipam.validate_network( - gso_subscription_id="1a2b3c4d5e6f7890", network=ipam.ipaddress.ip_network("10.255.255.0/26") + host = allocate_host_inner( + hostname=hostname + domain_name, + networks=(str(ipv4_network), str(ipv6_network)), + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs, + ) + assert "NETWORK_FULL" not in host, "Network is full." + assert "NETWORK_NOT_FOUND" not in host, "Network does not exist in IPAM. Create it first." + + elif host_addresses: + ipv4_addr = host_addresses.v4 + ipv6_addr = host_addresses.v6 + assert_host_in_service( + ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks ) - assert service_network is None - - -@responses.activate -def test_validate_host(data_config_filename: PathLike): - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*record:host.*"), - json=[ - { - "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 - "ipv4addrs": [ - { - "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 - "configure_for_dhcp": False, - "host": "ha_lo.gso", - "ipv4addr": "10.255.255.1", - } - ], - "ipv6addrs": [ - { - "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 - "configure_for_dhcp": False, - "host": "ha_lo.gso", - "ipv6addr": "dead:beef::1", - } - ], - "name": "ha_lo.gso", - "view": "default", - } - ], - ) - - responses.add( - method=responses.GET, - url=re.compile(r".*/wapi.*record:cname.*"), - json=[ - { - "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 - "canonical": "hA_LO.lo", - "name": "alias1.ha.lo", - "view": "default", - }, - { - "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 - "canonical": "hA_LO.lo", - "name": "alias2.ha.lo", - "view": "default", - }, - ], - ) - input_host_addresses = ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - host_addresses = ipam.validate_host( - hostname="ha_lo", - host_addresses=input_host_addresses, - cname_aliases=["alias1.ha", "alias2.ha"], - service_type="LO", - ) - assert host_addresses == ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") + host = allocate_host_inner( + hostname=hostname + domain_name, + addrs=(str(ipv4_addr), str(ipv6_addr)), + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs, + ) + assert "NETWORK_FULL" not in host + + return host + + +def delete_network( + network: ipaddress.ip_network = None, service_type: str = "" +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + """Delete IPv4 or IPv6 network by CIDR.""" + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert network, "No network specified to delete." + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + + ip_version = ip_network_version(str(network)) + + # Ensure that the network to be deleted is under the service type. + # Otherwise user isn't allowed to delete it + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + ipv4_network = None + ipv6_network = None + if ip_version == 4: + ipv4_network = network + else: + ipv6_network = network + assert_network_in_service( + ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks + ) + + network_info = find_networks(network=str(network), ip_version=ip_version) + assert len(network_info) == 1, "Network to delete does not exist in IPAM." + assert "_ref" in network_info[0], "Network to delete does not exist in IPAM." + + r = requests.delete( + f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Extract ipv4/ipv6 address from the network reference obtained in the + # response + r_text = r.text + network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) + if ip_version == 4: + return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) + return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) + + +def delete_host( + hostname: str = "", + host_addresses: HostAddresses = None, + cname_aliases: Optional[list] = None, + service_type: str = "", +) -> HostAddresses: + """Delete host record and associated CNAME records. + + All arguments passed to this function must match together a host record in + IPAM, and all CNAME records associated to it must also be passed exactly. + """ + if cname_aliases is None: + cname_aliases = [] + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert host_addresses, "No host specified to delete." + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + domain_name = getattr(ipam_params, service_type).domain_name + dns_view = getattr(ipam_params, service_type).dns_view + ipv4_addr = str(host_addresses.v4) + ipv6_addr = str(host_addresses.v6) + + assert_host_in_service( + host_addresses.v4, + host_addresses.v6, + oss_ipv4_containers, + oss_ipv6_containers, + oss_ipv4_networks, + oss_ipv6_networks, + ) + + # Find host record reference + r = requests.get( + f"{wapi(infoblox_params)}/record:host", + params={ + "name": (hostname + domain_name).lower(), # hostnames are lowercase + "ipv4addr": ipv4_addr, + "ipv6addr": ipv6_addr, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + host_data = r.json() + assert len(host_data) == 1, "Host to delete does not exist in IPAM." + assert "_ref" in host_data[0], "Host to delete does not exist in IPAM." + host_ref = host_data[0]["_ref"] + + # Find CNAME records reference + r = requests.get( + f"{wapi(infoblox_params)}/record:cname", + params={ + "canonical": hostname + domain_name, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + cname_data = r.json() + provided_cnames = [item + domain_name for item in cname_aliases] + found_cnames = [item["name"] for item in cname_data if "name" in item] + assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." + + # Delete the host record + r = requests.delete( + f"{wapi(infoblox_params)}/{host_ref}", + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Delete the CNAME records + cname_refs = [item["_ref"] for item in cname_data if "name" in item] + for cname_ref in cname_refs: + r = requests.delete( + f"{wapi(infoblox_params)}/{cname_ref}", + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + return host_addresses + + +def validate_network( + gso_subscription_id: str = "", network: ipaddress.ip_network = None +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + """Validate IPv4 or IPv6 network. + + Check if the specified network exist, and, if it does, check if its comment field contains gso_subscription_id. + Returns the network if validation successful. + """ + assert network, "No network specified to validate." + + ip_version = ip_network_version(str(network)) + network_info = find_networks(network=str(network), ip_version=ip_version) + assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM." + assert "comment" in network_info[0], "Network to validate does not have comment in IPAM." + assert ( + gso_subscription_id in network_info[0]["comment"] + ), "GSO subscription ID does not match the one in the comment field of the IPAM network." + + if ip_version == 4: + return V4ServiceNetwork(v4=network) + return V6ServiceNetwork(v6=network) + + +def validate_host( + hostname: str = "", + host_addresses: HostAddresses = None, + cname_aliases: Optional[list] = None, + service_type: str = "", +) -> HostAddresses: + """Validate host. + + Check if all arguments passed to this function match together a host record in + IPAM, and all CNAME records associated to it also match exactly. + Returns the host if validation successful. + """ + if cname_aliases is None: + cname_aliases = [] + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert hostname and host_addresses, "No host specified to validate. Either hostname or host_addresses missing." + domain_name = getattr(ipam_params, service_type).domain_name + ipv4_addr = str(host_addresses.v4) + ipv6_addr = str(host_addresses.v6) + dns_view = getattr(ipam_params, service_type).dns_view + + # Find host record reference + r = requests.get( + f"{wapi(infoblox_params)}/record:host", + params={ + "name": (hostname + domain_name).lower(), # hostnames are lowercase + "ipv4addr": ipv4_addr, + "ipv6addr": ipv6_addr, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, + ) + host_data = r.json() + assert len(host_data) == 1, "Host to validate does not exist in IPAM." + assert "_ref" in host_data[0], "Host to validate does not exist in IPAM." + + # Find CNAME records reference + r = requests.get( + f"{wapi(infoblox_params)}/record:cname", + params={ + "canonical": hostname + domain_name, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, # noqa: S501 + timeout=REQUESTS_TIMEOUT, ) + cname_data = r.json() + provided_cnames = [item + domain_name for item in cname_aliases] + found_cnames = [item["name"] for item in cname_data if "name" in item] + assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." - # Fail because non-matching hostname - host_addresses = ipam.validate_host( - hostname="wrong_hostname", - host_addresses=input_host_addresses, - cname_aliases=["alias1.ha", "alias2.ha"], - service_type="LO", - ) - with pytest.raises(AssertionError): - host_addresses = ipam.HostAddresses( - v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") - ) - assert host_addresses is None + return host_addresses diff --git a/gso/workflows/device/terminate_device.py b/gso/workflows/device/terminate_device.py index ac81a231189fa8d0582569b5ecce35ec73acded9..96d60f43470697ae03302c56830de5e8ead8e61f 100644 --- a/gso/workflows/device/terminate_device.py +++ b/gso/workflows/device/terminate_device.py @@ -11,7 +11,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.device import Device from gso.services import ipam -from gso.services.ipam import HostAddresses, V4ServiceNetwork, V6ServiceNetwork +from gso.services.ipam import V4HostAddress, V4ServiceNetwork, V6HostAddress, V6ServiceNetwork logger = logging.getLogger(__name__)