Skip to content
Snippets Groups Projects
Commit efd77610 authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

NAT-213: code imporvemements and tox

parent 7e0f5322
No related branches found
No related tags found
1 merge request!52Feature/nat 212 213
Pipeline #83721 failed
# mypy: ignore-errors
import ipaddress
from enum import Enum
from typing import Optional, Union
from typing import Optional, Tuple, Union
import requests
from pydantic import BaseSettings
......@@ -42,6 +42,9 @@ class IPAMErrors(Enum):
NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network"
REQUESTS_TIMEOUT = 20
# TODO: remove this!
# lab infoblox cert is not valid for the ipv4 address
# ... disable warnings for now
......@@ -56,7 +59,7 @@ def wapi(infoblox_params: settings.InfoBloxParams):
return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}"
def ip_addr_version(addr):
def ip_addr_version(addr: str = ""):
ip_version = None
ip_addr = ipaddress.ip_address(addr)
if isinstance(ip_addr, ipaddress.IPv4Address):
......@@ -67,7 +70,7 @@ def ip_addr_version(addr):
return ip_version
def ip_network_version(network):
def ip_network_version(network: str = ""):
ip_version = None
ip_network = ipaddress.ip_network(network)
if isinstance(ip_network, ipaddress.IPv4Network):
......@@ -79,12 +82,12 @@ def ip_network_version(network):
def assert_host_in_service(
ipv4_addr="",
ipv6_addr="",
oss_ipv4_containers=None,
oss_ipv6_containers=None,
oss_ipv4_networks=None,
oss_ipv6_networks=None,
ipv4_addr: str = "",
ipv6_addr: str = "",
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None,
):
# IPv4
if oss_ipv4_containers:
......@@ -107,13 +110,12 @@ def assert_host_in_service(
), "Host's IPv6 address doesn't belong to service type."
def find_networks(network_container=None, network=None, ip_version=4):
"""If network_container is not None, find all networks within the specified
container.
def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4):
"""If network_container is not None, find all networks within the specified container.
Otherwise, if network is not None, find the specified network.
Otherwise find all networks.
A list of all found networks is returned (an HTTP 200 code
may be returned with an empty list.).
may be returned with an empty list).
"""
assert ip_version in [4, 6]
oss = settings.load_oss_params()
......@@ -130,18 +132,21 @@ def find_networks(network_container=None, network=None, ip_version=4):
params=params,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return r.json()
def allocate_network(
def allocate_network_inner(
infoblox_params: settings.InfoBloxParams,
network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams],
ip_version=4,
comment="",
extattrs={},
ip_version: int = 4,
comment: Optional[str] = "",
extattrs: Optional[dict] = None,
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
if extattrs is None:
extattrs = {}
assert ip_version in [4, 6]
endpoint = "network" if ip_version == 4 else "ipv6network"
ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer"
......@@ -174,6 +179,7 @@ def allocate_network(
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
headers={"content-type": "application/json"},
verify=False,
timeout=REQUESTS_TIMEOUT
)
if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL):
break
......@@ -189,29 +195,49 @@ def allocate_network(
allocated_network = r.json()["network"]
if ip_version == 4:
return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network))
else:
return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network))
return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network))
def allocate_service_ipv4_network(service_type="", comment="", extattrs={}) -> V4ServiceNetwork:
def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork:
"""Allocate IPv4 network within the container of the specified service type."""
if extattrs is None:
extattrs = {}
oss = settings.load_oss_params()
assert oss.IPAM
ipam_params = oss.IPAM
assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."
return allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs)
return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs)
def allocate_service_ipv6_network(service_type="", comment="", extattrs={}) -> V6ServiceNetwork:
def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork:
"""Allocate IPv6 network within the container of the specified service type."""
if extattrs is None:
extattrs = {}
oss = settings.load_oss_params()
assert oss.IPAM
ipam_params = oss.IPAM
assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."
return allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs)
return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs)
def allocate_networks(
service_type: str = "",
comment: Optional[str] = "",
extattrs: Optional[dict] = None
) -> ServiceNetworks:
"""Allocate IPv4 and IPv6 network for the specified service type."""
if extattrs is None:
extattrs = {}
v4_service_network = allocate_ipv4_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
v6_service_network = allocate_ipv6_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6)
def find_next_available_ip(infoblox_params, network_ref=""):
def find_next_available_ip(infoblox_params, network_ref: str = ""):
"""Find the next available IP address from a network given its ref.
Returns "NETWORK_FULL" if there's no space in the network.
Otherwise returns the next available IP address in the network.
......@@ -220,6 +246,7 @@ def find_next_available_ip(infoblox_params, network_ref=""):
f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", # noqa: E501
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL):
......@@ -232,8 +259,13 @@ def find_next_available_ip(infoblox_params, network_ref=""):
return received_ip[0]
def allocate_host(
hostname="", addrs=None, networks=None, cname_aliases=[], dns_view="default", extattrs={}
def allocate_host_inner(
hostname: str = "",
addrs: Optional[Tuple] = None,
networks: Optional[Tuple] = None,
cname_aliases: Optional[list] = None,
dns_view: Optional[str] = "default",
extattrs: Optional[dict] = None
) -> Union[HostAddresses, str]:
"""If networks is not None, allocate host in those networks.
Otherwise if addrs is not None, allocate host with those addresses.
......@@ -245,6 +277,10 @@ def allocate_host(
"""
# TODO: should hostnames be unique
# (i.e. fail if hostname already exists in this domain/service)?
if cname_aliases is None:
cname_aliases = []
if extattrs is None:
extattrs = {}
assert addrs or networks, "You must specify either the host addresses or the networks CIDR."
oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX
......@@ -296,6 +332,7 @@ def allocate_host(
json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert isinstance(r.json(), str)
......@@ -311,6 +348,7 @@ def allocate_host(
json=cname_req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert r.json().startswith("record:cname/")
......@@ -318,13 +356,13 @@ def allocate_host(
return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr))
def allocate_service_host(
hostname="",
service_type="",
service_networks: ServiceNetworks = None,
host_addresses: HostAddresses = None,
cname_aliases=None,
extattrs={},
def allocate_host(
hostname: str = "",
service_type: str = "",
service_networks: Optional[ServiceNetworks] = None,
host_addresses: Optional[ServiceNetworks] = None,
cname_aliases: Optional[list] = None,
extattrs: Optional[dict] = None,
) -> HostAddresses:
"""Allocate host record with both IPv4 and IPv6 address, and respective DNS
A and AAAA records.
......@@ -342,6 +380,10 @@ def allocate_service_host(
The domain name is taken from the service type and appended to the
specified hostname.
"""
if cname_aliases is None:
cname_aliases = []
if extattrs is None:
extattrs = {}
oss = settings.load_oss_params()
assert oss.IPAM
ipam_params = oss.IPAM
......@@ -369,11 +411,11 @@ def allocate_service_host(
# Use them to allocate new networks that can allocate the hosts.
# IPv4
ipv4_network = str(allocate_service_ipv4_network(service_type=service_type).v4)
ipv4_network = str(allocate_ipv4_network(service_type=service_type).v4)
assert ipv4_network, "No available space for IPv4 networks for this service type."
# IPv6
ipv6_network = str(allocate_service_ipv6_network(service_type=service_type).v6)
ipv6_network = str(allocate_ipv6_network(service_type=service_type).v6)
assert ipv6_network, "No available space for IPv6 networks for this service type."
elif oss_ipv4_networks and oss_ipv6_networks:
......@@ -386,7 +428,7 @@ def allocate_service_host(
ipv6_network_index = 0
while True:
network_tuple = (ipv4_network, ipv6_network)
host = allocate_host(
host = allocate_host_inner(
hostname=hostname + domain_name,
networks=network_tuple,
cname_aliases=cname_aliases,
......@@ -396,7 +438,7 @@ def allocate_service_host(
if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host:
break
elif "IPV4" in host:
if "IPV4" in host:
ipv4_network_index += 1
assert oss_ipv4_networks, "No available space in any IPv4 network for this service."
assert ipv4_network_index < len(
......@@ -426,7 +468,7 @@ def allocate_service_host(
else:
assert ipv6_network in oss_ipv6_networks
host = allocate_host(
host = allocate_host_inner(
hostname=hostname + domain_name,
networks=(str(ipv4_network), str(ipv6_network)),
cname_aliases=cname_aliases,
......@@ -443,7 +485,7 @@ def allocate_service_host(
ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks
)
host = allocate_host(
host = allocate_host_inner(
hostname=hostname + domain_name,
addrs=(str(ipv4_addr), str(ipv6_addr)),
cname_aliases=cname_aliases,
......@@ -455,8 +497,9 @@ def allocate_service_host(
return host
def delete_service_network(
network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, service_type=""
def delete_network(
network: Union[V4ServiceNetwork, V6ServiceNetwork] = None,
service_type: str = ""
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Delete IPv4 or IPv6 network by CIDR."""
oss = settings.load_oss_params()
......@@ -499,27 +542,31 @@ def delete_service_network(
f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}',
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
# Extract ipv4/ipv6 address from the network reference obtained in the
# response
r_text = r.text
print(r_text)
network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":"))
if ip_version == 4:
return V4ServiceNetwork(v4=ipaddress.ip_network(network_address))
else:
return V6ServiceNetwork(v6=ipaddress.ip_network(network_address))
return V6ServiceNetwork(v6=ipaddress.ip_network(network_address))
def delete_service_host(
hostname="", host_addresses: HostAddresses = None, cname_aliases=[], service_type=""
def delete_host(
hostname: str = "",
host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None,
service_type: str = ""
) -> Union[V4HostAddress, V6HostAddress]:
"""Delete host record and associated CNAME records.
All arguments passed to this function must match together a host record in
IPAM, and all CNAME records associated to it must also be passed exactly.
"""
if cname_aliases is None:
cname_aliases = []
oss = settings.load_oss_params()
assert oss.IPAM
ipam_params = oss.IPAM
......@@ -556,6 +603,7 @@ def delete_service_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
host_data = r.json()
assert len(host_data) == 1, "Host does not exist."
......@@ -571,6 +619,7 @@ def delete_service_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
cname_data = r.json()
provided_cnames = [item + domain_name for item in cname_aliases]
......@@ -582,6 +631,7 @@ def delete_service_host(
f"{wapi(infoblox_params)}/{host_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
......@@ -592,39 +642,8 @@ def delete_service_host(
f"{wapi(infoblox_params)}/{cname_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return host_addresses
def new_service_networks(service_type: str = "", comment: str = "", extattrs: dict = None) -> ServiceNetworks:
if extattrs is None:
extattrs = {}
v4_service_network = allocate_service_ipv4_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
v6_service_network = allocate_service_ipv6_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6)
def new_service_host(
hostname: str,
service_type: str = "",
service_networks: Optional[ServiceNetworks] = None,
host_addresses: Optional[HostAddresses] = None,
cname_aliases: list = None,
extattrs: dict = None,
) -> HostAddresses:
if extattrs is None:
extattrs = {}
return allocate_service_host(
hostname=hostname,
service_type=service_type,
service_networks=service_networks,
host_addresses=host_addresses,
cname_aliases=cname_aliases,
extattrs=extattrs,
)
......@@ -17,7 +17,7 @@ from gso.products.product_blocks import device as device_pb
from gso.products.product_types import device
from gso.products.product_types.device import DeviceInactive, DeviceProvisioning
from gso.products.product_types.site import Site
from gso.services import _ipam, provisioning_proxy
from gso.services import ipam, provisioning_proxy
from gso.services.provisioning_proxy import await_pp_results, confirm_pp_results
......@@ -76,17 +76,17 @@ def iso_from_ipv4(ipv4_address: ipaddress.IPv4Address) -> str:
def get_info_from_ipam(subscription: DeviceProvisioning) -> State:
lo0_alias = re.sub(".geant.net", "", subscription.device.device_fqdn)
lo0_name = f"lo0.{lo0_alias}"
lo0_addr = _ipam.allocate_service_host(hostname=lo0_name, service_type="LO", cname_aliases=[lo0_alias])
lo0_addr = ipam.allocate_host(hostname=lo0_name, service_type="LO", cname_aliases=[lo0_alias])
subscription.device.device_lo_ipv4_address = lo0_addr.v4
subscription.device.device_lo_ipv6_address = lo0_addr.v6
subscription.device.device_lo_iso_address = iso_from_ipv4(subscription.device.device_lo_ipv4_address)
subscription.device.device_si_ipv4_network = _ipam.allocate_service_ipv4_network(
subscription.device.device_si_ipv4_network = ipam.allocate_ipv4_network(
service_type="SI", comment=f"SI for {lo0_name}"
).v4
subscription.device.device_ias_lt_ipv4_network = _ipam.allocate_service_ipv4_network(
subscription.device.device_ias_lt_ipv4_network = ipam.allocate_ipv4_network(
service_type="LT_IAS", comment=f"LT for {lo0_name}"
).v4
subscription.device.device_ias_lt_ipv6_network = _ipam.allocate_service_ipv6_network(
subscription.device.device_ias_lt_ipv6_network = ipam.allocate_ipv6_network(
service_type="LT_IAS", comment=f"LT for {lo0_name}"
).v6
return {"subscription": subscription}
......
......@@ -40,7 +40,7 @@ def deprovision_loopback_ips(subscription: Device) -> dict[str, V4HostAddress |
fqdn_as_list = subscription.device.device_fqdn.split(".")
hostname = str(fqdn_as_list[0]) + "." + str(fqdn_as_list[1]) + "." + str(fqdn_as_list[2])
lo0_name = "lo0." + hostname
host_addresses = ipam.delete_service_host(
host_addresses = ipam.delete_host(
hostname=lo0_name,
host_addresses=input_host_addresses,
cname_aliases=[hostname],
......@@ -51,7 +51,7 @@ def deprovision_loopback_ips(subscription: Device) -> dict[str, V4HostAddress |
@step("Deprovision SI- interface IPs from IPAM/DNS")
def deprovision_si_ips(subscription: Device) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
service_network = ipam.delete_service_network(
service_network = ipam.delete_network(
network=ipaddress.ip_network(subscription.device.device_si_ipv4_network),
service_type="SI",
)
......@@ -60,11 +60,11 @@ def deprovision_si_ips(subscription: Device) -> dict[str, V4ServiceNetwork | V6S
@step("Deprovision LT- interface (IAS) IPs from IPAM/DNS")
def deprovision_lt_ips(subscription: Device) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
service_network_v4 = ipam.delete_service_network(
service_network_v4 = ipam.delete_network(
network=ipaddress.ip_network(subscription.device.device_ias_lt_ipv4_network),
service_type="LT_IAS",
)
service_network_v6 = ipam.delete_service_network(
service_network_v6 = ipam.delete_network(
network=ipaddress.ip_network(subscription.device.device_ias_lt_ipv6_network),
service_type="LT_IAS",
)
......
......@@ -13,7 +13,7 @@ from gso.products.product_blocks import PhyPortCapacity
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_types.device import Device
from gso.products.product_types.iptrunk import IptrunkInactive, IptrunkProvisioning
from gso.services import _ipam, provisioning_proxy
from gso.services import ipam, provisioning_proxy
from gso.services.provisioning_proxy import await_pp_results, confirm_pp_results
......@@ -98,11 +98,11 @@ def create_subscription(product: UUIDstr) -> State:
@step("Get information from IPAM")
def get_info_from_ipam(subscription: IptrunkProvisioning) -> State:
# TODO: get info about how these should be generated
subscription.iptrunk.iptrunk_ipv4_network = _ipam.allocate_service_ipv4_network(
subscription.iptrunk.iptrunk_ipv4_network = ipam.allocate_ipv4_network(
service_type="TRUNK",
comment=subscription.iptrunk.iptrunk_description,
).v4
subscription.iptrunk.iptrunk_ipv6_network = _ipam.allocate_service_ipv6_network(
subscription.iptrunk.iptrunk_ipv6_network = ipam.allocate_ipv6_network(
service_type="TRUNK",
comment=subscription.iptrunk.iptrunk_description,
).v6
......
......@@ -65,7 +65,7 @@ def deprovision_ip_trunk_real(subscription: Iptrunk, process_id: UUIDstr) -> Sta
@step("Deprovision IPv4 networks")
def deprovision_ip_trunk_ipv4(subscription: Iptrunk) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
service_network = ipam.delete_service_network(
service_network = ipam.delete_network(
network=ipaddress.ip_network(subscription.iptrunk.iptrunk_ipv4_network),
service_type="TRUNK",
)
......@@ -74,7 +74,7 @@ def deprovision_ip_trunk_ipv4(subscription: Iptrunk) -> dict[str, V4ServiceNetwo
@step("Deprovision IPv6 networks")
def deprovision_ip_trunk_ipv6(subscription: Iptrunk) -> dict[str, V4ServiceNetwork | V6ServiceNetwork]:
service_network = ipam.delete_service_network(
service_network = ipam.delete_network(
network=ipaddress.ip_network(subscription.iptrunk.iptrunk_ipv6_network),
service_type="TRUNK",
)
......
......@@ -9,7 +9,7 @@ from gso.services import ipam
@responses.activate
def test_new_service_networks(data_config_filename: PathLike):
def test_allocate_networks(data_config_filename: PathLike):
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/network.*"),
......@@ -28,19 +28,19 @@ def test_new_service_networks(data_config_filename: PathLike):
},
)
service_networks = ipam.new_service_networks(service_type="TRUNK")
service_networks = ipam.allocate_networks(service_type="TRUNK")
assert service_networks == ipam.ServiceNetworks(
v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128")
)
# should fail because this service type has networks instead of containers
with pytest.raises(AssertionError):
service_networks = ipam.new_service_networks(service_type="LO")
service_networks = ipam.allocate_networks(service_type="LO")
assert service_networks is None
@responses.activate
def test_new_service_host(data_config_filename: PathLike):
def test_allocate_host(data_config_filename: PathLike):
responses.add(
method=responses.POST,
url=re.compile(r".*/wapi.*/record:host$"),
......@@ -135,7 +135,7 @@ def test_new_service_host(data_config_filename: PathLike):
)
# test host creation by IP addresses
service_hosts = ipam.new_service_host(
service_hosts = ipam.allocate_host(
hostname="test",
service_type="TRUNK",
host_addresses=ipam.HostAddresses(
......@@ -147,7 +147,7 @@ def test_new_service_host(data_config_filename: PathLike):
)
# test host creation by network addresses
service_hosts = ipam.new_service_host(
service_hosts = ipam.allocate_host(
hostname="test",
service_type="TRUNK",
service_networks=ipam.ServiceNetworks(
......@@ -159,20 +159,20 @@ def test_new_service_host(data_config_filename: PathLike):
)
# test host creation by just service_type when service cfg uses networks
service_hosts = ipam.new_service_host(hostname="test", service_type="LO")
service_hosts = ipam.allocate_host(hostname="test", service_type="LO")
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
)
# test host creation by just service_type when service cfg uses containers
service_hosts = ipam.new_service_host(hostname="test", service_type="TRUNK")
service_hosts = ipam.allocate_host(hostname="test", service_type="TRUNK")
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18")
)
# test host creation that should return a no available IP error
with pytest.raises(AssertionError):
service_hosts = ipam.new_service_host(
service_hosts = ipam.allocate_host(
hostname="test",
service_type="TRUNK",
service_networks=ipam.ServiceNetworks(
......@@ -183,7 +183,7 @@ def test_new_service_host(data_config_filename: PathLike):
# test host creation that should return a network not exist error
with pytest.raises(AssertionError):
service_hosts = ipam.new_service_host(
service_hosts = ipam.allocate_host(
hostname="test",
service_type="TRUNK",
service_networks=ipam.ServiceNetworks(
......@@ -194,7 +194,7 @@ def test_new_service_host(data_config_filename: PathLike):
@responses.activate
def test_delete_service_network(data_config_filename: PathLike):
def test_delete_network(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"),
......@@ -267,29 +267,29 @@ def test_delete_service_network(data_config_filename: PathLike):
body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501
)
service_network = ipam.delete_service_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO")
service_network = ipam.delete_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO")
assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26"))
with pytest.raises(AssertionError):
service_network = ipam.delete_service_network(
service_network = ipam.delete_network(
network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO"
)
assert service_network is None
service_network = ipam.delete_service_network(
service_network = ipam.delete_network(
network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK"
)
assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128"))
with pytest.raises(AssertionError):
service_network = ipam.delete_service_network(
service_network = ipam.delete_network(
network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK"
)
assert service_network is None
@responses.activate
def test_delete_service_host(data_config_filename: PathLike):
def test_delete_host(data_config_filename: PathLike):
responses.add(
method=responses.GET,
url=re.compile(r".*/wapi.*record:host.*"),
......@@ -352,7 +352,7 @@ def test_delete_service_host(data_config_filename: PathLike):
input_host_addresses = ipam.HostAddresses(
v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1")
)
host_addresses = ipam.delete_service_host(
host_addresses = ipam.delete_host(
hostname="ha_lo",
host_addresses=input_host_addresses,
cname_aliases=["alias1.ha", "alias2.ha"],
......@@ -364,14 +364,14 @@ def test_delete_service_host(data_config_filename: PathLike):
# Fail because missing CNAME
with pytest.raises(AssertionError):
host_addresses = ipam.delete_service_host(
host_addresses = ipam.delete_host(
hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO"
)
assert host_addresses is None
# Fail because non-matching CNAME
with pytest.raises(AssertionError):
host_addresses = ipam.delete_service_host(
host_addresses = ipam.delete_host(
hostname="ha_lo",
host_addresses=input_host_addresses,
cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment