Skip to content
Snippets Groups Projects
Commit 71d2b14f authored by JORGE SASIAIN's avatar JORGE SASIAIN
Browse files

IPAM experimentation: Implement v6 host allocation/deletion

parent 45a007ad
No related branches found
No related tags found
1 merge request!9Ipam service
......@@ -44,6 +44,26 @@ def _wapi(infoblox_params: settings.InfoBloxParams):
return (f'https://{infoblox_params.host}'
f'/wapi/{infoblox_params.wapi_version}')
def _ip_addr_version(addr):
ip_version = None
ip_addr = ipaddress.ip_address(addr)
if isinstance(ip_addr, ipaddress.IPv4Address):
ip_version = 4
elif isinstance(ip_addr, ipaddress.IPv6Address):
ip_version = 6
assert ip_version in [4, 6]
return ip_version
def _ip_network_version(network):
ip_version = None
ip_network = ipaddress.ip_network(network)
if isinstance(ip_network, ipaddress.IPv4Network):
ip_version = 4
elif isinstance(ip_network, ipaddress.IPv6Network):
ip_version = 6
assert ip_version in [4, 6]
return ip_version
def find_containers(network=None, ip_version=4):
assert ip_version in [4, 6]
oss = settings.load_oss_params()
......@@ -142,13 +162,7 @@ def delete_network(network):
assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX
ip_version = None
ip_network = ipaddress.ip_network(network)
if isinstance(ip_network, ipaddress.IPv4Network):
ip_version = 4
elif isinstance(ip_network, ipaddress.IPv6Network):
ip_version = 6
assert ip_version in [4, 6]
ip_version = _ip_network_version(network)
network_info = find_networks(network=network, ip_version=ip_version)
assert len(network_info) == 1, "Network does not exist."
......@@ -179,22 +193,26 @@ def _find_next_available_ip(infoblox_params, network_ref):
assert len(received_ip) == 1
return received_ip[0]
def allocate_host(mac=None, hostname=None, ipv4addr=None, network_cidr=None):
def allocate_host(mac=None, hostname=None, addr=None, network=None):
oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX
if network_cidr:
if network:
ip_version = _ip_network_version(network)
# Find the next available IP address in the network
network_info = find_networks(network=network_cidr)
network_info = find_networks(network=network, ip_version=ip_version)
assert len(network_info) == 1, "Network does not exist. Create it first."
assert '_ref' in network_info[0]
ipv4addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"])
addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"])
_req_payload = {
else:
ip_version = _ip_addr_version(addr)
_ipv4_req_payload = {
"ipv4addrs": [
{
"ipv4addr": ipv4addr,
"ipv4addr": addr,
"mac": mac
}
],
......@@ -203,6 +221,19 @@ def allocate_host(mac=None, hostname=None, ipv4addr=None, network_cidr=None):
"view": "default"
}
_ipv6_req_payload = {
"ipv6addrs": [
{
"ipv6addr": addr
}
],
"name": hostname,
"configure_for_dns": False,
"view": "default"
}
_req_payload = _ipv4_req_payload if ip_version == 4 else _ipv6_req_payload
r = requests.post(
f'{_wapi(infoblox_params)}/record:host',
json=_req_payload,
......@@ -212,17 +243,23 @@ def allocate_host(mac=None, hostname=None, ipv4addr=None, network_cidr=None):
assert isinstance(r.json(), str)
assert r.json().startswith("record:host/")
return V4HostAddress(v4=ipv4addr)
if ip_version == 4:
return V4HostAddress(v4=addr)
else:
return V6HostAddress(v6=addr)
def delete_host_by_ip(ipv4addr):
def delete_host_by_ip(addr):
oss = settings.load_oss_params()
assert oss.IPAM.INFOBLOX
infoblox_params = oss.IPAM.INFOBLOX
ip_version = _ip_addr_version(addr)
ip_param = 'ipv4addr' if ip_version == 4 else 'ipv6addr'
# Find host record reference
r = requests.get(
f'{_wapi(infoblox_params)}/record:host',
params={'ipv4addr': ipv4addr},
params={ip_param: addr},
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False)
host_data = r.json()
......@@ -236,8 +273,11 @@ def delete_host_by_ip(ipv4addr):
auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password),
verify=False)
r.raise_for_status()
return V4HostAddress(v4=ipv4addr)
if ip_version == 4:
return V4HostAddress(v4=addr)
else:
return V6HostAddress(v6=addr)
if __name__ == '__main__':
while True:
......@@ -265,7 +305,6 @@ if __name__ == '__main__':
elif choice == '3':
service_type = input("Enter service type: ")
ip_version = int(input("Enter IP version (4 or 6): "))
if ip_version == 4:
new_network = allocate_service_ipv4_network(service_type=service_type)
elif ip_version == 6:
......@@ -273,7 +312,6 @@ if __name__ == '__main__':
else:
print("Invalid IP version. Please enter either 4 or 6.")
continue
print(json.dumps(str(new_network), indent=2))
elif choice == '4':
......@@ -283,21 +321,21 @@ if __name__ == '__main__':
elif choice == '5':
hostname = input("Enter host name: ")
ipv4addr = input("Enter IPv4 address to allocate: ")
mac = input("Enter mac address: ")
alloc_ip = allocate_host(hostname=hostname, ipv4addr=ipv4addr, mac=mac)
addr = input("Enter IP address to allocate: ")
mac = input("Enter mac address (you can leave empty if IPv6): ")
alloc_ip = allocate_host(hostname=hostname, addr=addr, mac=mac)
print(json.dumps(str(alloc_ip), indent=2))
elif choice == '6':
hostname = input("Enter host name: ")
network_cidr = input("Enter an existing network CIDR to allocate from: ")
mac = input("Enter mac address: ")
alloc_ip = allocate_host(hostname=hostname, network_cidr=network_cidr, mac=mac)
network = input("Enter an existing network to allocate from (in CIDR notation): ")
mac = input("Enter mac address (you can leave empty if IPv6): ")
alloc_ip = allocate_host(hostname=hostname, network=network, mac=mac)
print(json.dumps(str(alloc_ip), indent=2))
elif choice == '7':
ipv4addr = input("Enter IPv4 address: ")
deleted_host = delete_host_by_ip(ipv4addr=ipv4addr)
addr = input("Enter IP address of host to delete: ")
deleted_host = delete_host_by_ip(addr=addr)
print(json.dumps(str(deleted_host), indent=2))
elif choice == '8':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment