Skip to content
Snippets Groups Projects
Commit 551aa18c authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

NAT-199: implement delete host in IPAM

parent 4ce8282e
Branches
Tags
2 merge requests!27Merge develop into NAT-185,!15Nat 185
...@@ -81,6 +81,32 @@ def _ip_network_version(network): ...@@ -81,6 +81,32 @@ def _ip_network_version(network):
return ip_version return ip_version
def _assert_host_in_service(
ipv4_addr='', ipv6_addr='',
oss_ipv4_containers=None, oss_ipv6_containers=None,
oss_ipv4_networks=None, oss_ipv6_networks=None
):
# IPv4
if oss_ipv4_containers:
assert any(ipv4_addr in oss_ipv4_container
for oss_ipv4_container in oss_ipv4_containers), \
"Host's IPv4 address doesn't belong to service type."
else:
assert any(ipv4_addr in oss_ipv4_network
for oss_ipv4_network in oss_ipv4_networks), \
"Host's IPv4 address doesn't belong to service type."
# IPv6
if oss_ipv6_containers:
assert any(ipv6_addr in oss_ipv6_container
for oss_ipv6_container in oss_ipv6_containers), \
"Host's IPv6 address doesn't belong to service type."
else:
assert any(ipv6_addr in oss_ipv6_network
for oss_ipv6_network in oss_ipv6_networks), \
"Host's IPv6 address doesn't belong to service type."
def _find_networks(network_container=None, network=None, ip_version=4): def _find_networks(network_container=None, network=None, ip_version=4):
""" """
If network_container is not None, find all networks within the specified If network_container is not None, find all networks within the specified
...@@ -248,7 +274,10 @@ def _allocate_host(hostname='', ...@@ -248,7 +274,10 @@ def _allocate_host(hostname='',
If networks is not None, allocate host in those networks. If networks is not None, allocate host in those networks.
Otherwise if addrs is not None, allocate host with those addresses. Otherwise if addrs is not None, allocate host with those addresses.
hostname parameter must be full name including domain name. hostname parameter must be full name including domain name.
Return an error string if couldn't allocate host due to network full. Return "IPV4_NETWORK_FULL" or "IPV6_NETWORK_FULL"
if couldn't allocate host due to requested network being full.
Return "IPV4_NETWORK_NOT_FOUND" or "IPV6_NETWORK_NOT_FOUND"
if couldn't allocate host due to requested network not existing.
""" """
# TODO: should hostnames be unique # TODO: should hostnames be unique
# (i.e. fail if hostname already exists in this domain/service)? # (i.e. fail if hostname already exists in this domain/service)?
...@@ -473,23 +502,13 @@ def allocate_service_host(hostname='', ...@@ -473,23 +502,13 @@ def allocate_service_host(hostname='',
"Network does not exist. Create it first." "Network does not exist. Create it first."
elif host_addresses: elif host_addresses:
# IPv4
ipv4_addr = host_addresses.v4 ipv4_addr = host_addresses.v4
if oss_ipv4_containers:
assert any(ipv4_addr in oss_ipv4_container
for oss_ipv4_container in oss_ipv4_containers)
else:
assert any(ipv4_addr in oss_ipv4_network
for oss_ipv4_network in oss_ipv4_networks)
# IPv6
ipv6_addr = host_addresses.v6 ipv6_addr = host_addresses.v6
if oss_ipv6_containers: _assert_host_in_service(
assert any(ipv6_addr in oss_ipv6_container ipv4_addr, ipv6_addr,
for oss_ipv6_container in oss_ipv6_containers) oss_ipv4_containers, oss_ipv6_containers,
else: oss_ipv4_networks, oss_ipv6_networks
assert any(ipv6_addr in oss_ipv6_network )
for oss_ipv6_network in oss_ipv6_networks)
host = _allocate_host( host = _allocate_host(
hostname=hostname+domain_name, hostname=hostname+domain_name,
...@@ -569,73 +588,98 @@ def delete_service_network(ipnetwork=None, service_type='' ...@@ -569,73 +588,98 @@ def delete_service_network(ipnetwork=None, service_type=''
return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) return V6ServiceNetwork(v6=ipaddress.ip_network(network_address))
# def delete_service_host( def delete_service_host(
# hostname='', hostname='',
# host_addresses: HostAddresses = None, host_addresses: HostAddresses = None,
# cname_aliases=[], cname_aliases=[],
# service_type='' service_type=''
# ) -> Union[V4HostAddress, V6HostAddress]: ) -> Union[V4HostAddress, V6HostAddress]:
# """ """
# Delete IPv4 or IPv6 host by its address. Delete host record and associated CNAME records.
# """ All arguments passed to this function must match together a host record in
# oss = settings.load_oss_params() IPAM, and all CNAME records associated to it must also be passed exactly.
# assert oss.IPAM.INFOBLOX """
# infoblox_params = oss.IPAM.INFOBLOX oss = settings.load_oss_params()
assert oss.IPAM
# ip_version = _ip_addr_version(addr) ipam_params = oss.IPAM
# ip_param = 'ipv4addr' if ip_version == 4 else 'ipv6addr' assert ipam_params.INFOBLOX
infoblox_params = ipam_params.INFOBLOX
# # Find host record reference
# r = requests.get( assert hasattr(ipam_params, service_type) \
# f'{_wapi(infoblox_params)}/record:host', and service_type != 'INFOBLOX', "Invalid service type."
# params={ip_param: addr}, oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers
# auth=HTTPBasicAuth(infoblox_params.username, oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers
# infoblox_params.password), oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks
# verify=False oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks
# ) domain_name = getattr(ipam_params, service_type).domain_name
# host_data = r.json() dns_view = getattr(ipam_params, service_type).dns_view
# assert len(host_data) == 1, "Host does not exist." ipv4_addr = str(host_addresses.v4)
# assert '_ref' in host_data[0] ipv6_addr = str(host_addresses.v6)
# host_ref = host_data[0]['_ref']
_assert_host_in_service(
# # Delete it host_addresses.v4, host_addresses.v6,
# r = requests.delete( oss_ipv4_containers, oss_ipv6_containers,
# f'{_wapi(infoblox_params)}/{host_ref}', oss_ipv4_networks, oss_ipv6_networks
# auth=HTTPBasicAuth(infoblox_params.username, )
# infoblox_params.password),
# verify=False # Find host record reference
# ) r = requests.get(
# assert r.status_code >= 200 and r.status_code < 300, \ f'{_wapi(infoblox_params)}/record:host',
# f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" params={
'name': (hostname+domain_name).lower(), # hostnames are lowercase
# # Also find and delete the associated dns a/aaaa record 'ipv4addr': ipv4_addr,
# endpoint = 'record:a' if ip_version == 4 else 'record:aaaa' 'ipv6addr': ipv6_addr,
'view': dns_view,
# r = requests.get( },
# f'{_wapi(infoblox_params)}/{endpoint}', auth=HTTPBasicAuth(infoblox_params.username,
# params={ip_param: addr}, infoblox_params.password),
# auth=HTTPBasicAuth(infoblox_params.username, verify=False
# infoblox_params.password), )
# verify=False host_data = r.json()
# ) assert len(host_data) == 1, "Host does not exist."
# dns_data = r.json() assert '_ref' in host_data[0]
# assert len(dns_data) == 1, "DNS record does not exist." host_ref = host_data[0]['_ref']
# assert '_ref' in dns_data[0]
# dns_ref = dns_data[0]['_ref'] # Find cname records reference
r = requests.get(
# r = requests.delete( f'{_wapi(infoblox_params)}/record:cname',
# f'{_wapi(infoblox_params)}/{dns_ref}', params={
# auth=HTTPBasicAuth(infoblox_params.username, 'canonical': hostname+domain_name,
# infoblox_params.password), "view": dns_view,
# verify=False },
# ) auth=HTTPBasicAuth(infoblox_params.username,
# assert r.status_code >= 200 and r.status_code < 300, \ infoblox_params.password),
# f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" verify=False
)
# if ip_version == 4: cname_data = r.json()
# return V4HostAddress(v4=addr) provided_cnames = [item + domain_name for item in cname_aliases]
# else: found_cnames = [item['name'] for item in cname_data if 'name' in item]
# return V6HostAddress(v6=addr) assert provided_cnames == found_cnames, \
"Provided CNAME alias names don't match the ones poiting to hostname."
# Delete the host record
r = requests.delete(
f'{_wapi(infoblox_params)}/{host_ref}',
auth=HTTPBasicAuth(infoblox_params.username,
infoblox_params.password),
verify=False
)
assert r.status_code >= 200 and r.status_code < 300, \
f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
# Delete the CNAME records
cname_refs = [item['_ref'] for item in cname_data if 'name' in item]
for cname_ref in cname_refs:
r = requests.delete(
f'{_wapi(infoblox_params)}/{cname_ref}',
auth=HTTPBasicAuth(infoblox_params.username,
infoblox_params.password),
verify=False
)
assert r.status_code >= 200 and r.status_code < 300, \
f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
return host_addresses
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment