diff --git a/gso/services/ipam.py b/gso/services/ipam.py index a97994d0a022588440319964a666205d351e4e0c..6299c9c401e2cf0352825bf5ab2c6d70a635acdf 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -1,7 +1,7 @@ # mypy: ignore-errors import ipaddress from enum import Enum -from typing import Optional, Union +from typing import Optional, Tuple, Union import requests from pydantic import BaseSettings @@ -42,6 +42,9 @@ class IPAMErrors(Enum): NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network" +REQUESTS_TIMEOUT = 20 + + # TODO: remove this! # lab infoblox cert is not valid for the ipv4 address # ... disable warnings for now @@ -56,7 +59,7 @@ def wapi(infoblox_params: settings.InfoBloxParams): return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}" -def ip_addr_version(addr): +def ip_addr_version(addr: str = ""): ip_version = None ip_addr = ipaddress.ip_address(addr) if isinstance(ip_addr, ipaddress.IPv4Address): @@ -67,7 +70,7 @@ def ip_addr_version(addr): return ip_version -def ip_network_version(network): +def ip_network_version(network: str = ""): ip_version = None ip_network = ipaddress.ip_network(network) if isinstance(ip_network, ipaddress.IPv4Network): @@ -79,12 +82,12 @@ def ip_network_version(network): def assert_host_in_service( - ipv4_addr="", - ipv6_addr="", - oss_ipv4_containers=None, - oss_ipv6_containers=None, - oss_ipv4_networks=None, - oss_ipv6_networks=None, + ipv4_addr: str = "", + ipv6_addr: str = "", + oss_ipv4_containers = None, + oss_ipv6_containers = None, + oss_ipv4_networks = None, + oss_ipv6_networks = None, ): # IPv4 if oss_ipv4_containers: @@ -107,13 +110,12 @@ def assert_host_in_service( ), "Host's IPv6 address doesn't belong to service type." -def find_networks(network_container=None, network=None, ip_version=4): - """If network_container is not None, find all networks within the specified - container. +def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4): + """If network_container is not None, find all networks within the specified container. Otherwise, if network is not None, find the specified network. Otherwise find all networks. A list of all found networks is returned (an HTTP 200 code - may be returned with an empty list.). + may be returned with an empty list). """ assert ip_version in [4, 6] oss = settings.load_oss_params() @@ -130,18 +132,21 @@ def find_networks(network_container=None, network=None, ip_version=4): params=params, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return r.json() -def allocate_network( +def allocate_network_inner( infoblox_params: settings.InfoBloxParams, network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], - ip_version=4, - comment="", - extattrs={}, + ip_version: int = 4, + comment: Optional[str] = "", + extattrs: Optional[dict] = None, ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + if extattrs is None: + extattrs = {} assert ip_version in [4, 6] endpoint = "network" if ip_version == 4 else "ipv6network" ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer" @@ -174,6 +179,7 @@ def allocate_network( auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), headers={"content-type": "application/json"}, verify=False, + timeout=REQUESTS_TIMEOUT ) if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): break @@ -189,29 +195,49 @@ def allocate_network( allocated_network = r.json()["network"] if ip_version == 4: return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) - else: - return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) + return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) -def allocate_service_ipv4_network(service_type="", comment="", extattrs={}) -> V4ServiceNetwork: +def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork: """Allocate IPv4 network within the container of the specified service type.""" + if extattrs is None: + extattrs = {} oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - return allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) + return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) -def allocate_service_ipv6_network(service_type="", comment="", extattrs={}) -> V6ServiceNetwork: +def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork: """Allocate IPv6 network within the container of the specified service type.""" + if extattrs is None: + extattrs = {} oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - return allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) + return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) + + +def allocate_networks( + service_type: str = "", + comment: Optional[str] = "", + extattrs: Optional[dict] = None +) -> ServiceNetworks: + """Allocate IPv4 and IPv6 network for the specified service type.""" + if extattrs is None: + extattrs = {} + v4_service_network = allocate_ipv4_network( + service_type=service_type, comment=comment, extattrs=extattrs + ) + v6_service_network = allocate_ipv6_network( + service_type=service_type, comment=comment, extattrs=extattrs + ) + return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) -def find_next_available_ip(infoblox_params, network_ref=""): +def find_next_available_ip(infoblox_params, network_ref: str = ""): """Find the next available IP address from a network given its ref. Returns "NETWORK_FULL" if there's no space in the network. Otherwise returns the next available IP address in the network. @@ -220,6 +246,7 @@ def find_next_available_ip(infoblox_params, network_ref=""): f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", # noqa: E501 auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): @@ -232,8 +259,13 @@ def find_next_available_ip(infoblox_params, network_ref=""): return received_ip[0] -def allocate_host( - hostname="", addrs=None, networks=None, cname_aliases=[], dns_view="default", extattrs={} +def allocate_host_inner( + hostname: str = "", + addrs: Optional[Tuple] = None, + networks: Optional[Tuple] = None, + cname_aliases: Optional[list] = None, + dns_view: Optional[str] = "default", + extattrs: Optional[dict] = None ) -> Union[HostAddresses, str]: """If networks is not None, allocate host in those networks. Otherwise if addrs is not None, allocate host with those addresses. @@ -245,6 +277,10 @@ def allocate_host( """ # TODO: should hostnames be unique # (i.e. fail if hostname already exists in this domain/service)? + if cname_aliases is None: + cname_aliases = [] + if extattrs is None: + extattrs = {} assert addrs or networks, "You must specify either the host addresses or the networks CIDR." oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX @@ -296,6 +332,7 @@ def allocate_host( json=req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert isinstance(r.json(), str) @@ -311,6 +348,7 @@ def allocate_host( json=cname_req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.json().startswith("record:cname/") @@ -318,13 +356,13 @@ def allocate_host( return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) -def allocate_service_host( - hostname="", - service_type="", - service_networks: ServiceNetworks = None, - host_addresses: HostAddresses = None, - cname_aliases=None, - extattrs={}, +def allocate_host( + hostname: str = "", + service_type: str = "", + service_networks: Optional[ServiceNetworks] = None, + host_addresses: Optional[ServiceNetworks] = None, + cname_aliases: Optional[list] = None, + extattrs: Optional[dict] = None, ) -> HostAddresses: """Allocate host record with both IPv4 and IPv6 address, and respective DNS A and AAAA records. @@ -342,6 +380,10 @@ def allocate_service_host( The domain name is taken from the service type and appended to the specified hostname. """ + if cname_aliases is None: + cname_aliases = [] + if extattrs is None: + extattrs = {} oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM @@ -369,11 +411,11 @@ def allocate_service_host( # Use them to allocate new networks that can allocate the hosts. # IPv4 - ipv4_network = str(allocate_service_ipv4_network(service_type=service_type).v4) + ipv4_network = str(allocate_ipv4_network(service_type=service_type).v4) assert ipv4_network, "No available space for IPv4 networks for this service type." # IPv6 - ipv6_network = str(allocate_service_ipv6_network(service_type=service_type).v6) + ipv6_network = str(allocate_ipv6_network(service_type=service_type).v6) assert ipv6_network, "No available space for IPv6 networks for this service type." elif oss_ipv4_networks and oss_ipv6_networks: @@ -386,7 +428,7 @@ def allocate_service_host( ipv6_network_index = 0 while True: network_tuple = (ipv4_network, ipv6_network) - host = allocate_host( + host = allocate_host_inner( hostname=hostname + domain_name, networks=network_tuple, cname_aliases=cname_aliases, @@ -396,7 +438,7 @@ def allocate_service_host( if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: break - elif "IPV4" in host: + if "IPV4" in host: ipv4_network_index += 1 assert oss_ipv4_networks, "No available space in any IPv4 network for this service." assert ipv4_network_index < len( @@ -426,7 +468,7 @@ def allocate_service_host( else: assert ipv6_network in oss_ipv6_networks - host = allocate_host( + host = allocate_host_inner( hostname=hostname + domain_name, networks=(str(ipv4_network), str(ipv6_network)), cname_aliases=cname_aliases, @@ -443,7 +485,7 @@ def allocate_service_host( ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks ) - host = allocate_host( + host = allocate_host_inner( hostname=hostname + domain_name, addrs=(str(ipv4_addr), str(ipv6_addr)), cname_aliases=cname_aliases, @@ -455,8 +497,9 @@ def allocate_service_host( return host -def delete_service_network( - network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, service_type="" +def delete_network( + network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, + service_type: str = "" ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: """Delete IPv4 or IPv6 network by CIDR.""" oss = settings.load_oss_params() @@ -499,27 +542,31 @@ def delete_service_network( f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" # Extract ipv4/ipv6 address from the network reference obtained in the # response r_text = r.text - print(r_text) network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) if ip_version == 4: return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) - else: - return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) + return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) -def delete_service_host( - hostname="", host_addresses: HostAddresses = None, cname_aliases=[], service_type="" +def delete_host( + hostname: str = "", + host_addresses: HostAddresses = None, + cname_aliases: Optional[list] = None, + service_type: str = "" ) -> Union[V4HostAddress, V6HostAddress]: """Delete host record and associated CNAME records. All arguments passed to this function must match together a host record in IPAM, and all CNAME records associated to it must also be passed exactly. """ + if cname_aliases is None: + cname_aliases = [] oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM @@ -556,6 +603,7 @@ def delete_service_host( }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) host_data = r.json() assert len(host_data) == 1, "Host does not exist." @@ -571,6 +619,7 @@ def delete_service_host( }, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) cname_data = r.json() provided_cnames = [item + domain_name for item in cname_aliases] @@ -582,6 +631,7 @@ def delete_service_host( f"{wapi(infoblox_params)}/{host_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" @@ -592,39 +642,8 @@ def delete_service_host( f"{wapi(infoblox_params)}/{cname_ref}", auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False, + timeout=REQUESTS_TIMEOUT ) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return host_addresses - - -def new_service_networks(service_type: str = "", comment: str = "", extattrs: dict = None) -> ServiceNetworks: - if extattrs is None: - extattrs = {} - v4_service_network = allocate_service_ipv4_network( - service_type=service_type, comment=comment, extattrs=extattrs - ) - v6_service_network = allocate_service_ipv6_network( - service_type=service_type, comment=comment, extattrs=extattrs - ) - return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) - - -def new_service_host( - hostname: str, - service_type: str = "", - service_networks: Optional[ServiceNetworks] = None, - host_addresses: Optional[HostAddresses] = None, - cname_aliases: list = None, - extattrs: dict = None, -) -> HostAddresses: - if extattrs is None: - extattrs = {} - return allocate_service_host( - hostname=hostname, - service_type=service_type, - service_networks=service_networks, - host_addresses=host_addresses, - cname_aliases=cname_aliases, - extattrs=extattrs, - ) diff --git a/gso/workflows/device/create_device.py b/gso/workflows/device/create_device.py index f162f5d4cea9a25b4f77de4c5698b7d18566ee0e..49cb958063550349477bb45df1ef050ebbef7bcf 100644 --- a/gso/workflows/device/create_device.py +++ b/gso/workflows/device/create_device.py @@ -17,7 +17,7 @@ from gso.products.product_blocks import device as device_pb from gso.products.product_types import device from gso.products.product_types.device import DeviceInactive, DeviceProvisioning from gso.products.product_types.site import Site -from gso.services import _ipam, provisioning_proxy +from gso.services import ipam, provisioning_proxy from gso.services.provisioning_proxy import await_pp_results, confirm_pp_results @@ -76,17 +76,17 @@ def iso_from_ipv4(ipv4_address: ipaddress.IPv4Address) -> str: def get_info_from_ipam(subscription: DeviceProvisioning) -> State: lo0_alias = re.sub(".geant.net", "", subscription.device.device_fqdn) lo0_name = f"lo0.{lo0_alias}" - lo0_addr = _ipam.allocate_service_host(hostname=lo0_name, service_type="LO", cname_aliases=[lo0_alias]) + lo0_addr = ipam.allocate_host(hostname=lo0_name, service_type="LO", cname_aliases=[lo0_alias]) subscription.device.device_lo_ipv4_address = lo0_addr.v4 subscription.device.device_lo_ipv6_address = lo0_addr.v6 subscription.device.device_lo_iso_address = iso_from_ipv4(subscription.device.device_lo_ipv4_address) - subscription.device.device_si_ipv4_network = _ipam.allocate_service_ipv4_network( + subscription.device.device_si_ipv4_network = ipam.allocate_ipv4_network( service_type="SI", comment=f"SI for {lo0_name}" ).v4 - subscription.device.device_ias_lt_ipv4_network = _ipam.allocate_service_ipv4_network( + subscription.device.device_ias_lt_ipv4_network = ipam.allocate_ipv4_network( service_type="LT_IAS", comment=f"LT for {lo0_name}" ).v4 - subscription.device.device_ias_lt_ipv6_network = _ipam.allocate_service_ipv6_network( + subscription.device.device_ias_lt_ipv6_network = ipam.allocate_ipv6_network( service_type="LT_IAS", comment=f"LT for {lo0_name}" ).v6 return {"subscription": subscription} diff --git a/gso/workflows/device/terminate_device.py b/gso/workflows/device/terminate_device.py index b7564d2a197a652c8b77edc830a5e374488b4459..f9786c11061f79157cbb8b12ef69a24cc4de2f69 100644 --- a/gso/workflows/device/terminate_device.py +++ b/gso/workflows/device/terminate_device.py @@ -40,7 +40,7 @@ def deprovision_loopback_ips(subscription: Device) -> dict[str, V4HostAddress | fqdn_as_list = subscription.device.device_fqdn.split(".") hostname = str(fqdn_as_list[0]) + "." + str(fqdn_as_list[1]) + "." + str(fqdn_as_list[2]) lo0_name = "lo0." + hostname - host_addresses = ipam.delete_service_host( + host_addresses = ipam.delete_host( hostname=lo0_name, host_addresses=input_host_addresses, cname_aliases=[hostname], @@ -51,7 +51,7 @@ def deprovision_loopback_ips(subscription: Device) -> dict[str, V4HostAddress | @step("Deprovision SI- interface IPs from IPAM/DNS") def deprovision_si_ips(subscription: Device) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]: - service_network = ipam.delete_service_network( + service_network = ipam.delete_network( network=ipaddress.ip_network(subscription.device.device_si_ipv4_network), service_type="SI", ) @@ -60,11 +60,11 @@ def deprovision_si_ips(subscription: Device) -> dict[str, V4ServiceNetwork | V6S @step("Deprovision LT- interface (IAS) IPs from IPAM/DNS") def deprovision_lt_ips(subscription: Device) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]: - service_network_v4 = ipam.delete_service_network( + service_network_v4 = ipam.delete_network( network=ipaddress.ip_network(subscription.device.device_ias_lt_ipv4_network), service_type="LT_IAS", ) - service_network_v6 = ipam.delete_service_network( + service_network_v6 = ipam.delete_network( network=ipaddress.ip_network(subscription.device.device_ias_lt_ipv6_network), service_type="LT_IAS", ) diff --git a/gso/workflows/iptrunk/create_iptrunk.py b/gso/workflows/iptrunk/create_iptrunk.py index 2a5a0b471a0096fcd0478e1cc5ccd3b25be5c781..e76022dd7623ee3917299447d57ba6d72fc44acb 100644 --- a/gso/workflows/iptrunk/create_iptrunk.py +++ b/gso/workflows/iptrunk/create_iptrunk.py @@ -13,7 +13,7 @@ from gso.products.product_blocks import PhyPortCapacity from gso.products.product_blocks.iptrunk import IptrunkType from gso.products.product_types.device import Device from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning -from gso.services import _ipam, provisioning_proxy +from gso.services import ipam, provisioning_proxy from gso.services.provisioning_proxy import await_pp_results, confirm_pp_results @@ -98,11 +98,11 @@ def create_subscription(product: UUIDstr) -> State: @step("Get information from IPAM") def get_info_from_ipam(subscription: IptrunkProvisioning) -> State: # TODO: get info about how these should be generated - subscription.iptrunk.iptrunk_ipv4_network = _ipam.allocate_service_ipv4_network( + subscription.iptrunk.iptrunk_ipv4_network = ipam.allocate_ipv4_network( service_type="TRUNK", comment=subscription.iptrunk.iptrunk_description, ).v4 - subscription.iptrunk.iptrunk_ipv6_network = _ipam.allocate_service_ipv6_network( + subscription.iptrunk.iptrunk_ipv6_network = ipam.allocate_ipv6_network( service_type="TRUNK", comment=subscription.iptrunk.iptrunk_description, ).v6 diff --git a/gso/workflows/iptrunk/terminate_iptrunk.py b/gso/workflows/iptrunk/terminate_iptrunk.py index 0388b7c7e3c116aaea497c779de0053460030866..5f8c58507037def550c4023613fc5925c71b498c 100644 --- a/gso/workflows/iptrunk/terminate_iptrunk.py +++ b/gso/workflows/iptrunk/terminate_iptrunk.py @@ -65,7 +65,7 @@ def deprovision_ip_trunk_real(subscription: Iptrunk, process_id: UUIDstr) -> Sta @step("Deprovision IPv4 networks") def deprovision_ip_trunk_ipv4(subscription: Iptrunk) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]: - service_network = ipam.delete_service_network( + service_network = ipam.delete_network( network=ipaddress.ip_network(subscription.iptrunk.iptrunk_ipv4_network), service_type="TRUNK", ) @@ -74,7 +74,7 @@ def deprovision_ip_trunk_ipv4(subscription: Iptrunk) -> dict[str, V4ServiceNetwo @step("Deprovision IPv6 networks") def deprovision_ip_trunk_ipv6(subscription: Iptrunk) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]: - service_network = ipam.delete_service_network( + service_network = ipam.delete_network( network=ipaddress.ip_network(subscription.iptrunk.iptrunk_ipv6_network), service_type="TRUNK", ) diff --git a/test/test_ipam.py b/test/test_ipam.py index 228b6612e53f6b0fb26802ee2e4b6f88fcccd156..8da9ad49ebf484b97879a62f6f6fd0eea5939325 100644 --- a/test/test_ipam.py +++ b/test/test_ipam.py @@ -9,7 +9,7 @@ from gso.services import ipam @responses.activate -def test_new_service_networks(data_config_filename: PathLike): +def test_allocate_networks(data_config_filename: PathLike): responses.add( method=responses.POST, url=re.compile(r".*/wapi.*/network.*"), @@ -28,19 +28,19 @@ def test_new_service_networks(data_config_filename: PathLike): }, ) - service_networks = ipam.new_service_networks(service_type="TRUNK") + service_networks = ipam.allocate_networks(service_type="TRUNK") assert service_networks == ipam.ServiceNetworks( v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") ) # should fail because this service type has networks instead of containers with pytest.raises(AssertionError): - service_networks = ipam.new_service_networks(service_type="LO") + service_networks = ipam.allocate_networks(service_type="LO") assert service_networks is None @responses.activate -def test_new_service_host(data_config_filename: PathLike): +def test_allocate_host(data_config_filename: PathLike): responses.add( method=responses.POST, url=re.compile(r".*/wapi.*/record:host$"), @@ -135,7 +135,7 @@ def test_new_service_host(data_config_filename: PathLike): ) # test host creation by IP addresses - service_hosts = ipam.new_service_host( + service_hosts = ipam.allocate_host( hostname="test", service_type="TRUNK", host_addresses=ipam.HostAddresses( @@ -147,7 +147,7 @@ def test_new_service_host(data_config_filename: PathLike): ) # test host creation by network addresses - service_hosts = ipam.new_service_host( + service_hosts = ipam.allocate_host( hostname="test", service_type="TRUNK", service_networks=ipam.ServiceNetworks( @@ -159,20 +159,20 @@ def test_new_service_host(data_config_filename: PathLike): ) # test host creation by just service_type when service cfg uses networks - service_hosts = ipam.new_service_host(hostname="test", service_type="LO") + service_hosts = ipam.allocate_host(hostname="test", service_type="LO") assert service_hosts == ipam.HostAddresses( v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") ) # test host creation by just service_type when service cfg uses containers - service_hosts = ipam.new_service_host(hostname="test", service_type="TRUNK") + service_hosts = ipam.allocate_host(hostname="test", service_type="TRUNK") assert service_hosts == ipam.HostAddresses( v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") ) # test host creation that should return a no available IP error with pytest.raises(AssertionError): - service_hosts = ipam.new_service_host( + service_hosts = ipam.allocate_host( hostname="test", service_type="TRUNK", service_networks=ipam.ServiceNetworks( @@ -183,7 +183,7 @@ def test_new_service_host(data_config_filename: PathLike): # test host creation that should return a network not exist error with pytest.raises(AssertionError): - service_hosts = ipam.new_service_host( + service_hosts = ipam.allocate_host( hostname="test", service_type="TRUNK", service_networks=ipam.ServiceNetworks( @@ -194,7 +194,7 @@ def test_new_service_host(data_config_filename: PathLike): @responses.activate -def test_delete_service_network(data_config_filename: PathLike): +def test_delete_network(data_config_filename: PathLike): responses.add( method=responses.GET, url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), @@ -267,29 +267,29 @@ def test_delete_service_network(data_config_filename: PathLike): body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 ) - service_network = ipam.delete_service_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO") + service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO") assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) with pytest.raises(AssertionError): - service_network = ipam.delete_service_network( + service_network = ipam.delete_network( network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO" ) assert service_network is None - service_network = ipam.delete_service_network( + service_network = ipam.delete_network( network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK" ) assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128")) with pytest.raises(AssertionError): - service_network = ipam.delete_service_network( + service_network = ipam.delete_network( network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK" ) assert service_network is None @responses.activate -def test_delete_service_host(data_config_filename: PathLike): +def test_delete_host(data_config_filename: PathLike): responses.add( method=responses.GET, url=re.compile(r".*/wapi.*record:host.*"), @@ -352,7 +352,7 @@ def test_delete_service_host(data_config_filename: PathLike): input_host_addresses = ipam.HostAddresses( v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") ) - host_addresses = ipam.delete_service_host( + host_addresses = ipam.delete_host( hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha", "alias2.ha"], @@ -364,14 +364,14 @@ def test_delete_service_host(data_config_filename: PathLike): # Fail because missing CNAME with pytest.raises(AssertionError): - host_addresses = ipam.delete_service_host( + host_addresses = ipam.delete_host( hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO" ) assert host_addresses is None # Fail because non-matching CNAME with pytest.raises(AssertionError): - host_addresses = ipam.delete_service_host( + host_addresses = ipam.delete_host( hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"],