Skip to content
Snippets Groups Projects
Commit 68639f09 authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

NAT-212/213: more code improvements, and validate_network method

parent efd77610
Branches
Tags
1 merge request!52Feature/nat 212 213
Pipeline #83732 failed
......@@ -87,7 +87,7 @@ def assert_host_in_service(
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None,
oss_ipv6_networks = None
):
# IPv4
if oss_ipv4_containers:
......@@ -110,6 +110,33 @@ def assert_host_in_service(
), "Host's IPv6 address doesn't belong to service type."
def assert_network_in_service(
ipv4_network: Optional[V4ServiceNetwork] = None,
ipv6_network: Optional[V6ServiceNetwork] = None,
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None
):
# IPv4
if ipv4_network:
if oss_ipv4_containers:
assert any(
ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers
), "Network doesn't belong to service type."
else:
assert ipv4_network in oss_ipv4_networks, "Network doesn't belong to service type."
# IPv6
if ipv6_network:
if oss_ipv6_containers:
assert any(
ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers
), "Network doesn't belong to service type."
else:
assert ipv6_network in oss_ipv6_networks, "Network doesn't belong to service type."
def find_networks(network_container: Optional[str] = "", network: Optional[str] = "", ip_version: int = 4):
"""If network_container is not None, find all networks within the specified container.
Otherwise, if network is not None, find the specified network.
......@@ -126,7 +153,7 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str]
if network_container:
params = {"network_container": network_container}
elif network:
params = {"network": network}
params = {"network": network, "_return_fields": "comment"}
r = requests.get(
f"{wapi(infoblox_params)}/{endpoint}",
params=params,
......@@ -281,7 +308,7 @@ def allocate_host_inner(
cname_aliases = []
if extattrs is None:
extattrs = {}
assert addrs or networks, "You must specify either the host addresses or the networks CIDR."
assert addrs or networks, "Neither networks nor host addresses could be derived to allocate host."
oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX
......@@ -454,19 +481,11 @@ def allocate_host(
ipv6_network = str(oss_ipv6_networks[ipv6_network_index])
elif service_networks:
# IPv4
ipv4_network = service_networks.v4
if oss_ipv4_containers:
assert any(ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers)
else:
assert ipv4_network in oss_ipv4_networks
# IPv6
ipv6_network = service_networks.v6
if oss_ipv6_containers:
assert any(ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers)
else:
assert ipv6_network in oss_ipv6_networks
assert_network_in_service(
ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks
)
host = allocate_host_inner(
hostname=hostname + domain_name,
......@@ -476,7 +495,7 @@ def allocate_host(
extattrs=extattrs,
)
assert "NETWORK_FULL" not in host, "Network is full."
assert "NETWORK_NOT_FOUND" not in host, "Network does not exist. Create it first."
assert "NETWORK_NOT_FOUND" not in host, "Network does not exist in IPAM. Create it first."
elif host_addresses:
ipv4_addr = host_addresses.v4
......@@ -508,35 +527,30 @@ def delete_network(
assert ipam_params.INFOBLOX
infoblox_params = ipam_params.INFOBLOX
assert network, "No network specified to delete."
assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."
ip_version = ip_network_version(str(network))
# Ensure that the network to be deleted is under the service type.
# Otherwise user is not allowed to delete it
oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers
oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers
oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks
oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks
ipv4_network = None
ipv6_network = None
if ip_version == 4:
oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers
oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks
if oss_ipv4_containers:
assert any(
network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers
), "Can't delete: network doesn't belong to service type."
else:
assert network in oss_ipv4_networks, "Can't delete: network doesn't belong to service type."
ipv4_network = network
else:
oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers
oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks
if oss_ipv6_containers:
assert any(
network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers
), "Can't delete: network doesn't belong to service type."
else:
assert network in oss_ipv6_networks, "Can't delete: network doesn't belong to service type."
ipv6_network = network
assert_network_in_service(
ipv4_network, ipv6_network, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks
)
network_info = find_networks(network=str(network), ip_version=ip_version)
assert len(network_info) == 1, "Network does not exist."
assert "_ref" in network_info[0]
assert len(network_info) == 1, "Network to delete does not exist in IPAM."
assert "_ref" in network_info[0], "Network to delete does not exist in IPAM."
r = requests.delete(
f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}',
......@@ -573,6 +587,7 @@ def delete_host(
assert ipam_params.INFOBLOX
infoblox_params = ipam_params.INFOBLOX
assert host_addresses, "No host specified to delete."
assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type."
oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers
oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers
......@@ -606,8 +621,8 @@ def delete_host(
timeout=REQUESTS_TIMEOUT
)
host_data = r.json()
assert len(host_data) == 1, "Host does not exist."
assert "_ref" in host_data[0]
assert len(host_data) == 1, "Host to delete does not exist in IPAM."
assert "_ref" in host_data[0], "Host to delete does not exist in IPAM."
host_ref = host_data[0]["_ref"]
# Find cname records reference
......@@ -647,3 +662,23 @@ def delete_host(
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return host_addresses
def validate_network(
gso_subscription_id: str = "",
network: Union[V4ServiceNetwork, V6ServiceNetwork] = None,
service_type: str = ""
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Validate IPv4 or IPv6 network.
Check if the specified network exist, and, if it does,
check if its comment field contains gso_subscription_id.
"""
assert network, "No network specified to validate."
ip_version = ip_network_version(str(network))
network_info = find_networks(network=str(network), ip_version=ip_version)
assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM."
assert "comment" in network_info[0], "Network to validate does not have comment in IPAM."
assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network."
return network
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment