Skip to content
Snippets Groups Projects
Commit 3ccf7111 authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

NAT-212: validate_host

parent 68639f09
Branches
Tags
1 merge request!52Feature/nat 212 213
Pipeline #83740 failed
...@@ -574,7 +574,7 @@ def delete_host( ...@@ -574,7 +574,7 @@ def delete_host(
host_addresses: HostAddresses = None, host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None, cname_aliases: Optional[list] = None,
service_type: str = "" service_type: str = ""
) -> Union[V4HostAddress, V6HostAddress]: ) -> HostAddresses:
"""Delete host record and associated CNAME records. """Delete host record and associated CNAME records.
All arguments passed to this function must match together a host record in All arguments passed to this function must match together a host record in
IPAM, and all CNAME records associated to it must also be passed exactly. IPAM, and all CNAME records associated to it must also be passed exactly.
...@@ -672,6 +672,7 @@ def validate_network( ...@@ -672,6 +672,7 @@ def validate_network(
"""Validate IPv4 or IPv6 network. """Validate IPv4 or IPv6 network.
Check if the specified network exist, and, if it does, Check if the specified network exist, and, if it does,
check if its comment field contains gso_subscription_id. check if its comment field contains gso_subscription_id.
Returns the network if validation successful.
""" """
assert network, "No network specified to validate." assert network, "No network specified to validate."
...@@ -682,3 +683,64 @@ def validate_network( ...@@ -682,3 +683,64 @@ def validate_network(
assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network." assert gso_subscription_id in network_info[0]["comment"], "GSO subscription ID does not match the one in the comment field of the IPAM network."
return network return network
def validate_host(
hostname: str = "",
host_addresses: HostAddresses = None,
cname_aliases: Optional[list] = None,
service_type: str = ""
) -> HostAddresses:
"""Validate host.
Check if all arguments passed to this function match together a host record in
IPAM, and all CNAME records associated to it also match exactly.
Returns the host if validation successful.
"""
if cname_aliases is None:
cname_aliases = []
oss = settings.load_oss_params()
assert oss.IPAM
ipam_params = oss.IPAM
assert ipam_params.INFOBLOX
infoblox_params = ipam_params.INFOBLOX
assert hostname and host_addresses, "No host specified to validate. Either hostname or host_addresses missing."
domain_name = getattr(ipam_params, service_type).domain_name
ipv4_addr = str(host_addresses.v4)
ipv6_addr = str(host_addresses.v6)
dns_view = getattr(ipam_params, service_type).dns_view
# Find host record reference
r = requests.get(
f"{wapi(infoblox_params)}/record:host",
params={
"name": (hostname + domain_name).lower(), # hostnames are lowercase
"ipv4addr": ipv4_addr,
"ipv6addr": ipv6_addr,
"view": dns_view,
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
host_data = r.json()
assert len(host_data) == 1, "Host to validate does not exist in IPAM."
assert "_ref" in host_data[0], "Host to validate does not exist in IPAM."
# Find cname records reference
r = requests.get(
f"{wapi(infoblox_params)}/record:cname",
params={
"canonical": hostname + domain_name,
"view": dns_view,
},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False,
timeout=REQUESTS_TIMEOUT
)
cname_data = r.json()
provided_cnames = [item + domain_name for item in cname_aliases]
found_cnames = [item["name"] for item in cname_data if "name" in item]
assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname."
return HostAddresses
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment