Skip to content
Snippets Groups Projects

Feature/nat 212 213

Merged JORGE SASIAIN requested to merge feature/nat-212-213 into develop
4 files
+ 47
58
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 38
41
@@ -84,10 +84,10 @@ def ip_network_version(network: str = ""):
def assert_host_in_service(
ipv4_addr: str = "",
ipv6_addr: str = "",
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None
oss_ipv4_containers=None,
oss_ipv6_containers=None,
oss_ipv4_networks=None,
oss_ipv6_networks=None,
):
# IPv4
if oss_ipv4_containers:
@@ -113,10 +113,10 @@ def assert_host_in_service(
def assert_network_in_service(
ipv4_network: Optional[V4ServiceNetwork] = None,
ipv6_network: Optional[V6ServiceNetwork] = None,
oss_ipv4_containers = None,
oss_ipv6_containers = None,
oss_ipv4_networks = None,
oss_ipv6_networks = None
oss_ipv4_containers=None,
oss_ipv6_containers=None,
oss_ipv4_networks=None,
oss_ipv6_networks=None,
):
# IPv4
if ipv4_network:
@@ -166,7 +166,7 @@ def find_networks(network_container: Optional[str] = "", network: Optional[str]
params=params,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return r.json()
@@ -213,7 +213,7 @@ def allocate_network_inner(
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
headers={"content-type": "application/json"},
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL):
break
@@ -232,7 +232,9 @@ def allocate_network_inner(
return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network))
def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V4ServiceNetwork:
def allocate_ipv4_network(
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V4ServiceNetwork:
"""Allocate IPv4 network within the container of the specified service type.
Args:
@@ -255,7 +257,9 @@ def allocate_ipv4_network(service_type: str = "", comment: Optional[str] = "", e
return allocate_network_inner(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs)
def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None) -> V6ServiceNetwork:
def allocate_ipv6_network(
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> V6ServiceNetwork:
"""Allocate IPv6 network within the container of the specified service type.
Args:
@@ -279,19 +283,13 @@ def allocate_ipv6_network(service_type: str = "", comment: Optional[str] = "", e
def allocate_networks(
service_type: str = "",
comment: Optional[str] = "",
extattrs: Optional[dict] = None
service_type: str = "", comment: Optional[str] = "", extattrs: Optional[dict] = None
) -> ServiceNetworks:
"""Allocate IPv4 and IPv6 network for the specified service type."""
if extattrs is None:
extattrs = {}
v4_service_network = allocate_ipv4_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
v6_service_network = allocate_ipv6_network(
service_type=service_type, comment=comment, extattrs=extattrs
)
v4_service_network = allocate_ipv4_network(service_type=service_type, comment=comment, extattrs=extattrs)
v6_service_network = allocate_ipv6_network(service_type=service_type, comment=comment, extattrs=extattrs)
return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6)
@@ -312,7 +310,7 @@ def find_next_available_ip(infoblox_params, network_ref: str = ""):
f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL):
@@ -331,9 +329,8 @@ def allocate_host_inner( # noqa: C901
networks: Optional[Tuple] = None,
cname_aliases: Optional[list] = None,
dns_view: Optional[str] = "default",
extattrs: Optional[dict] = None
extattrs: Optional[dict] = None,
) -> Union[HostAddresses, str]:
# TODO: should hostnames be unique
# (i.e. fail if hostname already exists in this domain/service)?
if cname_aliases is None:
@@ -395,7 +392,7 @@ def allocate_host_inner( # noqa: C901
json=req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert isinstance(r.json(), str)
@@ -411,7 +408,7 @@ def allocate_host_inner( # noqa: C901
json=cname_req_payload,
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert r.json().startswith("record:cname/")
@@ -433,7 +430,7 @@ def allocate_host( # noqa: C901
----
hostname (str): hostname of the host (without domain name, which is taken from the service type)
service_type (str): the name of the service type (e.g. "TRUNK")
service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host, must be in a valid container for the service type
service_networks (ServiceNetworks, optional): ipv4 and ipv6 network to allocate host
host_addresses (HostAddresses, optional): ipv4 and ipv6 addresses to allocate host (service_networks has precedence)
cname_aliases (list, optional): to create cname records in addition to the host record
extattrs (dict, optional): any extensible attributes to add in IPAM (e.g. "Site": {"value": "dummy"})
@@ -558,8 +555,7 @@ def allocate_host( # noqa: C901
def delete_network(
network: ipaddress.ip_network = None,
service_type: str = ""
network: ipaddress.ip_network = None, service_type: str = ""
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Delete IPv4 or IPv6 network by CIDR."""
oss = settings.load_oss_params()
@@ -597,7 +593,7 @@ def delete_network(
f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}',
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
@@ -614,7 +610,7 @@ def delete_host(
hostname: str = "",
host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None,
service_type: str = ""
service_type: str = "",
) -> HostAddresses:
"""Delete host record and associated CNAME records.
@@ -660,7 +656,7 @@ def delete_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
host_data = r.json()
assert len(host_data) == 1, "Host to delete does not exist in IPAM."
@@ -676,7 +672,7 @@ def delete_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
cname_data = r.json()
provided_cnames = [item + domain_name for item in cname_aliases]
@@ -688,7 +684,7 @@ def delete_host(
f"{wapi(infoblox_params)}/{host_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
@@ -699,7 +695,7 @@ def delete_host(
f"{wapi(infoblox_params)}/{cname_ref}",
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
@@ -707,8 +703,7 @@ def delete_host(
def validate_network(
gso_subscription_id: str = "",
network: ipaddress.ip_network = None
gso_subscription_id: str = "", network: ipaddress.ip_network = None
) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
"""Validate IPv4 or IPv6 network.
@@ -721,7 +716,9 @@ def validate_network(
network_info = find_networks(network=str(network), ip_version=ip_version)
assert len(network_info) == 1 and "_ref" in network_info[0], "Network to validate not found in IPAM."
assert "comment" in network_info[0], "Network to validate does not have comment in IPAM."
assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network."
assert (
gso_subscription_id in network_info[0]["comment"]
), "GSO subscription ID does not match the one in the comment field of the IPAM network."
if ip_version == 4:
return V4ServiceNetwork(v4=network)
@@ -732,7 +729,7 @@ def validate_host(
hostname: str = "",
host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None,
service_type: str = ""
service_type: str = "",
) -> HostAddresses:
"""Validate host.
@@ -765,7 +762,7 @@ def validate_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
host_data = r.json()
assert len(host_data) == 1, "Host to validate does not exist in IPAM."
@@ -780,7 +777,7 @@ def validate_host(
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False, # noqa: S501
timeout=REQUESTS_TIMEOUT
timeout=REQUESTS_TIMEOUT,
)
cname_data = r.json()
provided_cnames = [item + domain_name for item in cname_aliases]
Loading