Skip to content
Snippets Groups Projects
Commit 1c5f45a3 authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

NAT-185: combine IPv4/IPv6/DNS in host record and update tests

parent 8e6ef232
No related branches found
No related tags found
2 merge requests!27Merge develop into NAT-185,!15Nat 185
...@@ -109,39 +109,6 @@ def _find_networks(network_container=None, network=None, ip_version=4): ...@@ -109,39 +109,6 @@ def _find_networks(network_container=None, network=None, ip_version=4):
return r.json() return r.json()
def _get_network_capacity(network=None):
"""
Get utilization of a IPv4 network in a fraction of 1000.
"""
oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX
ip_version = _ip_network_version(network)
assert ip_version == 4, "Utilization is only available for IPv4 networks."
params = {
'network': network,
'_return_fields': 'network,total_hosts,utilization'
}
r = requests.get(
f'{_wapi(infoblox_params)}/network',
params=params,
auth=HTTPBasicAuth(infoblox_params.username,
infoblox_params.password),
verify=False
)
# Utilization info takes several minutes to converge.
# The IPAM utilization bar in the GUI as well. Why?
assert r.status_code >= 200 and r.status_code < 300, \
f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
capacity_info = r.json()
assert len(capacity_info) == 1, "Requested IPv4 network doesn't exist."
assert 'utilization' in capacity_info[0]
utilization = capacity_info[0]['utilization']
return utilization
def _allocate_network( def _allocate_network(
infoblox_params: settings.InfoBloxParams, infoblox_params: settings.InfoBloxParams,
network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams],
...@@ -254,42 +221,64 @@ def _find_next_available_ip(infoblox_params, network_ref): ...@@ -254,42 +221,64 @@ def _find_next_available_ip(infoblox_params, network_ref):
return received_ip[0] return received_ip[0]
def _allocate_host(hostname=None, addr=None, network=None, extattrs={} def _allocate_host(hostname=None,
) -> Union[V4HostAddress, V6HostAddress]: addrs=None,
networks=None,
extattrs={}
) -> HostAddresses:
""" """
If network is not None, allocate host in that network. If networks is not None, allocate host in those networks.
Otherwise if addr is not None, allocate host with that address. Otherwise if addrs is not None, allocate host with those addresses.
hostname parameter must be full name including domain name. hostname parameter must be full name including domain name.
""" """
# TODO: should hostnames be unique # TODO: should hostnames be unique
# (i.e. fail if hostname already exists in this domain/service)? # (i.e. fail if hostname already exists in this domain/service)?
assert addr or network, \ assert addrs or networks, \
"You must specify either the host address or the network CIDR." "You must specify either the host addresses or the networks CIDR."
oss = settings.load_oss_params() oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX
if network: if networks:
ip_version = _ip_network_version(network) ipv4_network = networks[0]
# Find the next available IP address in the network ipv6_network = networks[1]
network_info = _find_networks(network=network, ip_version=ip_version) assert _ip_network_version(ipv4_network) == 4
assert _ip_network_version(ipv6_network) == 6
# Find the next available IP address in each network
network_info = _find_networks(network=ipv4_network, ip_version=4)
assert len(network_info) == 1, \
"IPv4 Network does not exist. Create it first."
assert '_ref' in network_info[0]
ipv4_addr = _find_next_available_ip(infoblox_params,
network_info[0]["_ref"])
network_info = _find_networks(network=ipv6_network, ip_version=6)
assert len(network_info) == 1, \ assert len(network_info) == 1, \
"Network does not exist. Create it first." "IPv6 Network does not exist. Create it first."
assert '_ref' in network_info[0] assert '_ref' in network_info[0]
addr = _find_next_available_ip(infoblox_params, ipv6_addr = _find_next_available_ip(infoblox_params,
network_info[0]["_ref"]) network_info[0]["_ref"])
else: else:
ip_version = _ip_addr_version(addr) ipv4_addr = addrs[0]
ipv6_addr = addrs[1]
assert _ip_addr_version(ipv4_addr) == 4
assert _ip_addr_version(ipv6_addr) == 6
ip_req_payload = { ip_req_payload = {
f"ipv{ip_version}addrs": [ "ipv4addrs": [
{ {
f"ipv{ip_version}addr": addr "ipv4addr": ipv4_addr
}
],
"ipv6addrs": [
{
"ipv6addr": ipv6_addr
} }
], ],
"name": hostname, "name": hostname,
"configure_for_dns": False, "configure_for_dns": True,
"view": "default", "view": "default",
"extattrs": extattrs "extattrs": extattrs
} }
...@@ -306,31 +295,8 @@ def _allocate_host(hostname=None, addr=None, network=None, extattrs={} ...@@ -306,31 +295,8 @@ def _allocate_host(hostname=None, addr=None, network=None, extattrs={}
assert isinstance(r.json(), str) assert isinstance(r.json(), str)
assert r.json().startswith("record:host/") assert r.json().startswith("record:host/")
dns_req_payload = { return HostAddresses(v4=ipaddress.ip_address(ipv4_addr),
f"ipv{ip_version}addr": addr, v6=ipaddress.ip_address(ipv6_addr))
"name": hostname,
"view": "default",
"extattrs": extattrs
}
endpoint = 'record:a' if ip_version == 4 else 'record:aaaa'
r = requests.post(
f'{_wapi(infoblox_params)}/{endpoint}',
json=dns_req_payload,
auth=HTTPBasicAuth(infoblox_params.username,
infoblox_params.password),
verify=False
)
assert r.status_code >= 200 and r.status_code < 300, \
f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
assert isinstance(r.json(), str)
assert r.json().startswith(f"{endpoint}/")
if ip_version == 4:
return V4HostAddress(v4=addr)
else:
return V6HostAddress(v6=addr)
def allocate_service_host(hostname=None, def allocate_service_host(hostname=None,
...@@ -340,19 +306,16 @@ def allocate_service_host(hostname=None, ...@@ -340,19 +306,16 @@ def allocate_service_host(hostname=None,
extattrs={} extattrs={}
) -> HostAddresses: ) -> HostAddresses:
""" """
Allocate host with both IPv4 and IPv6 address (and respective DNS Allocate host record with both IPv4 and IPv6 address, and respective DNS
records). A and AAAA records.
The domain name is also taken from the service type and appended to - If service_networks is provided, that one is used.
specified hostname. - If service_networks is not provided, and host_addresses is provided,
If service_networks is provided, that one is used.
If service_networks is not provided, and host_addresses is provided,
those specific addresses are used. those specific addresses are used.
If neither is not provided, the first network with available space for - If neither is not provided, new ipv4 and ipv6 networks are created and
this service type is used. those are used. Note that in this case extattrs is for the hosts and not
Note that if WFO will always specify the network/addresses after for the networks.
creating it, this mode won't be needed. Currently this mode doesn't The domain name is taken from the service type and appended to the
look further than the first container, so if needed, this will need specified hostname.
to be updated.
""" """
oss = settings.load_oss_params() oss = settings.load_oss_params()
assert oss.IPAM assert oss.IPAM
...@@ -364,74 +327,59 @@ def allocate_service_host(hostname=None, ...@@ -364,74 +327,59 @@ def allocate_service_host(hostname=None,
ipv6_containers = getattr(ipam_params, service_type).V6.containers ipv6_containers = getattr(ipam_params, service_type).V6.containers
domain_name = getattr(ipam_params, service_type).domain_name domain_name = getattr(ipam_params, service_type).domain_name
# IPv4
if not service_networks and not host_addresses: if not service_networks and not host_addresses:
ipv4_networks_info = _find_networks( # IPv4
network_container=str(ipv4_containers[0]), ip_version=4) ipv4_network = str(allocate_service_ipv4_network(
assert len(ipv4_networks_info) >= 1, \ service_type=service_type).v4)
"No IPv4 network exists in the container for this service type." assert ipv4_network, \
first_nonfull_ipv4_network = None "No available space for IPv4 networks for this service type."
for ipv4_network_info in ipv4_networks_info:
assert 'network' in ipv4_network_info # IPv6
capacity = _get_network_capacity(ipv4_network_info["network"]) ipv6_network = str(allocate_service_ipv6_network(
if capacity < 1000: service_type=service_type).v6)
first_nonfull_ipv4_network = ipv4_network_info["network"] assert ipv6_network, \
break "No available space for IPv6 networks for this service type."
# Create a new network if the existing networks in the container for
# the service type are all full. network_tuple = (ipv4_network, ipv6_network)
if not first_nonfull_ipv4_network: host = _allocate_host(hostname=hostname+domain_name,
first_nonfull_ipv4_network = str(allocate_service_ipv4_network( networks=network_tuple,
service_type=service_type).v4) extattrs=extattrs)
assert first_nonfull_ipv4_network, \
"No available IPv4 addresses for this service type."
v4_host = _allocate_host(hostname=hostname+domain_name,
network=first_nonfull_ipv4_network,
extattrs=extattrs)
elif service_networks: elif service_networks:
network = service_networks.v4 # IPv4
assert any(network.subnet_of(ipv4_container) ipv4_network = service_networks.v4
for ipv4_container in ipv4_containers) assert any(ipv4_network.subnet_of(ipv4_container)
v4_host = _allocate_host(hostname=hostname+domain_name,
network=str(network),
extattrs=extattrs)
elif host_addresses:
addr = host_addresses.v4
assert any(addr in ipv4_container
for ipv4_container in ipv4_containers) for ipv4_container in ipv4_containers)
v4_host = _allocate_host(hostname=hostname+domain_name,
addr=str(addr),
extattrs=extattrs)
# IPv6 # IPv6
if not service_networks and not host_addresses: ipv6_network = service_networks.v6
# ipv6 does not support capacity fetching (not even the GUI displays assert any(ipv6_network.subnet_of(ipv6_container)
# it). Maybe it's assumed that there is always available space?
ipv6_networks_info = _find_networks(
network_container=str(ipv6_containers[0]),
ip_version=6)
assert len(ipv6_networks_info) >= 1, \
"No IPv6 network exists in the container for this service type."
assert 'network' in ipv6_networks_info[0]
# TODO: if "no available IP" error, create a new network?
v6_host = _allocate_host(hostname=hostname+domain_name,
network=ipv6_networks_info[0]['network'],
extattrs=extattrs)
elif service_networks:
network = service_networks.v6
assert any(network.subnet_of(ipv6_container)
for ipv6_container in ipv6_containers) for ipv6_container in ipv6_containers)
v6_host = _allocate_host(hostname=hostname+domain_name,
network=str(network), host = _allocate_host(
extattrs=extattrs) hostname=hostname+domain_name,
networks=(str(ipv4_network), str(ipv6_network)),
extattrs=extattrs
)
elif host_addresses: elif host_addresses:
addr = host_addresses.v6 # IPv4
assert any(addr in ipv6_container ipv4_addr = host_addresses.v4
assert any(ipv4_addr in ipv4_container
for ipv4_container in ipv4_containers)
# IPv6
ipv6_addr = host_addresses.v6
assert any(ipv6_addr in ipv6_container
for ipv6_container in ipv6_containers) for ipv6_container in ipv6_containers)
v6_host = _allocate_host(hostname=hostname+domain_name,
addr=str(addr),
extattrs=extattrs)
return HostAddresses(v4=v4_host.v4, v6=v6_host.v6) host = _allocate_host(
hostname=hostname+domain_name,
addrs=(str(ipv4_addr), str(ipv6_addr)),
extattrs=extattrs
)
return host
""" """
...@@ -462,6 +410,39 @@ def _find_containers(network=None, ip_version=4): ...@@ -462,6 +410,39 @@ def _find_containers(network=None, ip_version=4):
return r.json() return r.json()
def _get_network_capacity(network=None):
"""
Get utilization of a IPv4 network in a fraction of 1000.
"""
oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX
ip_version = _ip_network_version(network)
assert ip_version == 4, "Utilization is only available for IPv4 networks."
params = {
'network': network,
'_return_fields': 'network,total_hosts,utilization'
}
r = requests.get(
f'{_wapi(infoblox_params)}/network',
params=params,
auth=HTTPBasicAuth(infoblox_params.username,
infoblox_params.password),
verify=False
)
# Utilization info takes several minutes to converge.
# The IPAM utilization bar in the GUI as well. Why?
assert r.status_code >= 200 and r.status_code < 300, \
f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}"
capacity_info = r.json()
assert len(capacity_info) == 1, "Requested IPv4 network doesn't exist."
assert 'utilization' in capacity_info[0]
utilization = capacity_info[0]['utilization']
return utilization
def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]:
""" """
Delete IPv4 or IPv6 network by CIDR. Delete IPv4 or IPv6 network by CIDR.
...@@ -642,15 +623,22 @@ if __name__ == '__main__': ...@@ -642,15 +623,22 @@ if __name__ == '__main__':
elif choice == '6': elif choice == '6':
hostname = input("Enter host name (full name w/ domain name): ") hostname = input("Enter host name (full name w/ domain name): ")
addr = input("Enter IP address to allocate: ") addrv4 = input("Enter IPv4 address to allocate: ")
alloc_ip = _allocate_host(hostname=hostname, addr=addr) addrv6 = input("Enter IPv6 address to allocate: ")
alloc_ip = _allocate_host(hostname=hostname,
addrs=(addrv4,addrv6))
print(json.dumps(str(alloc_ip), indent=2)) print(json.dumps(str(alloc_ip), indent=2))
elif choice == '7': elif choice == '7':
hostname = input("Enter host name (full name w/ domain name): ") hostname = input("Enter host name (full name w/ domain name): ")
network = input( networkv4 = input(
"Enter existing network to allocate from (CIDR notation): ") "Enter existing ipv4 network to use (CIDR notation): ")
alloc_ip = _allocate_host(hostname=hostname, network=network) networkv6 = input(
"Enter existing ipv6 network to use (CIDR notation): ")
alloc_ip = _allocate_host(
hostname=hostname,
networks=(networkv4, networkv6)
)
print(json.dumps(str(alloc_ip), indent=2)) print(json.dumps(str(alloc_ip), indent=2))
elif choice == '8': elif choice == '8':
......
...@@ -55,7 +55,6 @@ def new_service_host(hostname, ...@@ -55,7 +55,6 @@ def new_service_host(hostname,
extattrs=extattrs) extattrs=extattrs)
'''
if __name__ == '__main__': if __name__ == '__main__':
# sample call flow to allocate two loopback interfaces and a trunk service # sample call flow to allocate two loopback interfaces and a trunk service
# new_service_host can be called passing networks or addresses # new_service_host can be called passing networks or addresses
...@@ -75,11 +74,9 @@ if __name__ == '__main__': ...@@ -75,11 +74,9 @@ if __name__ == '__main__':
) )
lo1_v4_host_address = lo1_service_networks.v4.network_address lo1_v4_host_address = lo1_service_networks.v4.network_address
lo1_v6_host_address = lo1_service_networks.v6.network_address lo1_v6_host_address = lo1_service_networks.v6.network_address
print(lo1_v4_host_address)
print(lo1_v6_host_address)
lo1_host_addresses = HostAddresses(v4=lo1_v4_host_address, lo1_host_addresses = HostAddresses(v4=lo1_v4_host_address,
v6=lo1_v6_host_address) v6=lo1_v6_host_address)
new_service_host(hostname=hostname_A, new_service_host(hostname=hostname_A+"_LO",
service_type='LO', service_type='LO',
host_addresses=lo1_host_addresses) host_addresses=lo1_host_addresses)
...@@ -94,7 +91,7 @@ if __name__ == '__main__': ...@@ -94,7 +91,7 @@ if __name__ == '__main__':
new_service_networks(service_type='LO', new_service_networks(service_type='LO',
comment="Network for h2 LO", comment="Network for h2 LO",
extattrs=lo2_network_extattrs) extattrs=lo2_network_extattrs)
new_service_host(hostname=hostname_B, new_service_host(hostname=hostname_B+"_LO",
service_type='LO', service_type='LO',
service_networks=lo2_service_networks, service_networks=lo2_service_networks,
extattrs=lo2_host_extattrs) extattrs=lo2_host_extattrs)
...@@ -104,10 +101,9 @@ if __name__ == '__main__': ...@@ -104,10 +101,9 @@ if __name__ == '__main__':
service_type='TRUNK', service_type='TRUNK',
comment="Network for h1-h2 TRUNK" comment="Network for h1-h2 TRUNK"
) )
new_service_host(hostname=hostname_A, new_service_host(hostname=hostname_A+"_TRUNK",
service_type='TRUNK', service_type='TRUNK',
service_networks=trunk12_service_networks) service_networks=trunk12_service_networks)
new_service_host(hostname=hostname_B, new_service_host(hostname=hostname_B+"_TRUNK",
service_type='TRUNK', service_type='TRUNK',
service_networks=trunk12_service_networks) service_networks=trunk12_service_networks)
'''
...@@ -81,16 +81,34 @@ def test_new_service_host(data_config_filename): ...@@ -81,16 +81,34 @@ def test_new_service_host(data_config_filename):
responses.add( responses.add(
method=responses.POST, method=responses.POST,
url=re.compile(r'.*/wapi.*/network.*/.*?_function=next_available_ip&num=1.*'), # noqa: E501 url=re.compile(r'.*/wapi.*/network.*/.*?_function=next_available_ip&num=1$'), # noqa: E501
json={'ips': ['10.255.255.20']} json={'ips': ['10.255.255.20']}
) )
responses.add( responses.add(
method=responses.POST, method=responses.POST,
url=re.compile(r'.*/wapi.*/ipv6network.*/.*?_function=next_available_ip&num=1.*'), # noqa: E501 url=re.compile(r'.*/wapi.*/ipv6network.*/.*?_function=next_available_ip&num=1$'), # noqa: E501
json={'ips': ['dead:beef::18']} json={'ips': ['dead:beef::18']}
) )
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/network.*_return_fields.*'),
json={
'_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501
'network': '10.255.255.20/32'
}
)
responses.add(
method=responses.POST,
url=re.compile(r'.*/wapi.*/ipv6network.*_return_fields.*'),
json={
'_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501
'network': 'dead:beef::18/128'
}
)
service_hosts = ipam.new_service_host( service_hosts = ipam.new_service_host(
hostname='test', hostname='test',
service_type='LO', service_type='LO',
...@@ -116,3 +134,12 @@ def test_new_service_host(data_config_filename): ...@@ -116,3 +134,12 @@ def test_new_service_host(data_config_filename):
v4=ipaddress.ip_address('10.255.255.20'), v4=ipaddress.ip_address('10.255.255.20'),
v6=ipaddress.ip_address('dead:beef::18') v6=ipaddress.ip_address('dead:beef::18')
) )
service_hosts = ipam.new_service_host(
hostname='test',
service_type='LO'
)
assert service_hosts == ipam.HostAddresses(
v4=ipaddress.ip_address('10.255.255.20'),
v6=ipaddress.ip_address('dead:beef::18')
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment