From 45a007ad344b77ecb4730c50f931d5f8e3a87427 Mon Sep 17 00:00:00 2001 From: Ubuntu <jorge.sasiain@ehu.eus> Date: Tue, 18 Apr 2023 10:47:36 +0000 Subject: [PATCH] IPAM network endpoints with ipv6 support --- gso/services/_ipam.py | 61 ++++++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index 64a80f99..50a6de37 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -44,25 +44,28 @@ def _wapi(infoblox_params: settings.InfoBloxParams): return (f'https://{infoblox_params.host}' f'/wapi/{infoblox_params.wapi_version}') -def find_containers(network=None): +def find_containers(network=None, ip_version=4): + assert ip_version in [4, 6] oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX + endpoint = 'networkcontainer' if ip_version == 4 else 'ipv6networkcontainer' r = requests.get( - f'{_wapi(infoblox_params)}/networkcontainer', + f'{_wapi(infoblox_params)}/{endpoint}', params={'network': network} if network else None, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False) assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return r.json() - -def find_networks(network=None): +def find_networks(network=None, ip_version=4): + assert ip_version in [4, 6] oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX + endpoint = 'network' if ip_version == 4 else 'ipv6network' r = requests.get( - f'{_wapi(infoblox_params)}/network', + f'{_wapi(infoblox_params)}/{endpoint}', params={'network': network} if network else None, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False) @@ -71,8 +74,9 @@ def find_networks(network=None): def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], ip_version=4): assert ip_version in [4, 6] + endpoint = 'network' if ip_version == 4 else 'ipv6network' - _req_payload = { + _ipv4_req_payload = { "network": { "_object_function": "next_available_network", "_parameters": { @@ -82,12 +86,28 @@ def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params: "_object_parameters": { "network": str(network_params.container) }, - "_result_field": "networks", + "_result_field": "networks", # only return in the response the allocated network, not all available + } + } + + _ipv6_req_payload = { + "network": { + "_object_function": "next_available_network", + "_parameters": { + "cidr": network_params.mask + }, + "_object": "ipv6networkcontainer", + "_object_parameters": { + "network": str(network_params.container) + }, + "_result_field": "networks", # only return in the response the allocated network, not all available } } + _req_payload = _ipv4_req_payload if ip_version == 4 else _ipv6_req_payload + r = requests.post( - f'{_wapi(infoblox_params)}/network', + f'{_wapi(infoblox_params)}/{endpoint}', params={'_return_fields': 'network'}, json=_req_payload, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), @@ -121,7 +141,16 @@ def delete_network(network): oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX - network_info = find_networks(network=network) + + ip_version = None + ip_network = ipaddress.ip_network(network) + if isinstance(ip_network, ipaddress.IPv4Network): + ip_version = 4 + elif isinstance(ip_network, ipaddress.IPv6Network): + ip_version = 6 + assert ip_version in [4, 6] + + network_info = find_networks(network=network, ip_version=ip_version) assert len(network_info) == 1, "Network does not exist." assert '_ref' in network_info[0] @@ -133,10 +162,10 @@ def delete_network(network): # Extract ipv4/ipv6 address from the network reference obtained in the response r_json = r.json() - network_address = ipaddress.ip_network(r_json.rsplit("/", 1)[0].split(":")[1]) - if isinstance(network_address, ipaddress.IPv4Network): + network_address = ipaddress.ip_network(r_json.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) + if ip_version == 4: return V4ServiceNetwork(v4=network_address) - elif isinstance(network_address, ipaddress.IPv6Network): + else: return V6ServiceNetwork(v6=network_address) def _find_next_available_ip(infoblox_params, network_ref): @@ -224,12 +253,14 @@ if __name__ == '__main__': choice = input("Enter your choice: ") if choice == '1': - containers = find_containers() + ip_version = int(input("Enter IP version (4 or 6): ")) + containers = find_containers(ip_version=ip_version) print(json.dumps(containers, indent=2)) elif choice == '2': - all_networks = find_networks() - print(json.dumps(all_networks, indent=2)) + ip_version = int(input("Enter IP version (4 or 6): ")) + networks = find_networks(ip_version=ip_version) + print(json.dumps(networks, indent=2)) elif choice == '3': service_type = input("Enter service type: ") -- GitLab