diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index 50a6de37750f7b7f69b4d0840c2f7e3e2d892e14..332ad46ce76d2e3e88eef138a3201aa354b90fc0 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -44,6 +44,26 @@ def _wapi(infoblox_params: settings.InfoBloxParams): return (f'https://{infoblox_params.host}' f'/wapi/{infoblox_params.wapi_version}') +def _ip_addr_version(addr): + ip_version = None + ip_addr = ipaddress.ip_address(addr) + if isinstance(ip_addr, ipaddress.IPv4Address): + ip_version = 4 + elif isinstance(ip_addr, ipaddress.IPv6Address): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + +def _ip_network_version(network): + ip_version = None + ip_network = ipaddress.ip_network(network) + if isinstance(ip_network, ipaddress.IPv4Network): + ip_version = 4 + elif isinstance(ip_network, ipaddress.IPv6Network): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + def find_containers(network=None, ip_version=4): assert ip_version in [4, 6] oss = settings.load_oss_params() @@ -142,13 +162,7 @@ def delete_network(network): assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX - ip_version = None - ip_network = ipaddress.ip_network(network) - if isinstance(ip_network, ipaddress.IPv4Network): - ip_version = 4 - elif isinstance(ip_network, ipaddress.IPv6Network): - ip_version = 6 - assert ip_version in [4, 6] + ip_version = _ip_network_version(network) network_info = find_networks(network=network, ip_version=ip_version) assert len(network_info) == 1, "Network does not exist." @@ -179,22 +193,26 @@ def _find_next_available_ip(infoblox_params, network_ref): assert len(received_ip) == 1 return received_ip[0] -def allocate_host(mac=None, hostname=None, ipv4addr=None, network_cidr=None): +def allocate_host(mac=None, hostname=None, addr=None, network=None): oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX - if network_cidr: + if network: + ip_version = _ip_network_version(network) # Find the next available IP address in the network - network_info = find_networks(network=network_cidr) + network_info = find_networks(network=network, ip_version=ip_version) assert len(network_info) == 1, "Network does not exist. Create it first." assert '_ref' in network_info[0] - ipv4addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) + addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) - _req_payload = { + else: + ip_version = _ip_addr_version(addr) + + _ipv4_req_payload = { "ipv4addrs": [ { - "ipv4addr": ipv4addr, + "ipv4addr": addr, "mac": mac } ], @@ -203,6 +221,19 @@ def allocate_host(mac=None, hostname=None, ipv4addr=None, network_cidr=None): "view": "default" } + _ipv6_req_payload = { + "ipv6addrs": [ + { + "ipv6addr": addr + } + ], + "name": hostname, + "configure_for_dns": False, + "view": "default" + } + + _req_payload = _ipv4_req_payload if ip_version == 4 else _ipv6_req_payload + r = requests.post( f'{_wapi(infoblox_params)}/record:host', json=_req_payload, @@ -212,17 +243,23 @@ def allocate_host(mac=None, hostname=None, ipv4addr=None, network_cidr=None): assert isinstance(r.json(), str) assert r.json().startswith("record:host/") - return V4HostAddress(v4=ipv4addr) + if ip_version == 4: + return V4HostAddress(v4=addr) + else: + return V6HostAddress(v6=addr) -def delete_host_by_ip(ipv4addr): +def delete_host_by_ip(addr): oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX + ip_version = _ip_addr_version(addr) + ip_param = 'ipv4addr' if ip_version == 4 else 'ipv6addr' + # Find host record reference r = requests.get( f'{_wapi(infoblox_params)}/record:host', - params={'ipv4addr': ipv4addr}, + params={ip_param: addr}, auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False) host_data = r.json() @@ -236,8 +273,11 @@ def delete_host_by_ip(ipv4addr): auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), verify=False) r.raise_for_status() - return V4HostAddress(v4=ipv4addr) + if ip_version == 4: + return V4HostAddress(v4=addr) + else: + return V6HostAddress(v6=addr) if __name__ == '__main__': while True: @@ -265,7 +305,6 @@ if __name__ == '__main__': elif choice == '3': service_type = input("Enter service type: ") ip_version = int(input("Enter IP version (4 or 6): ")) - if ip_version == 4: new_network = allocate_service_ipv4_network(service_type=service_type) elif ip_version == 6: @@ -273,7 +312,6 @@ if __name__ == '__main__': else: print("Invalid IP version. Please enter either 4 or 6.") continue - print(json.dumps(str(new_network), indent=2)) elif choice == '4': @@ -283,21 +321,21 @@ if __name__ == '__main__': elif choice == '5': hostname = input("Enter host name: ") - ipv4addr = input("Enter IPv4 address to allocate: ") - mac = input("Enter mac address: ") - alloc_ip = allocate_host(hostname=hostname, ipv4addr=ipv4addr, mac=mac) + addr = input("Enter IP address to allocate: ") + mac = input("Enter mac address (you can leave empty if IPv6): ") + alloc_ip = allocate_host(hostname=hostname, addr=addr, mac=mac) print(json.dumps(str(alloc_ip), indent=2)) elif choice == '6': hostname = input("Enter host name: ") - network_cidr = input("Enter an existing network CIDR to allocate from: ") - mac = input("Enter mac address: ") - alloc_ip = allocate_host(hostname=hostname, network_cidr=network_cidr, mac=mac) + network = input("Enter an existing network to allocate from (in CIDR notation): ") + mac = input("Enter mac address (you can leave empty if IPv6): ") + alloc_ip = allocate_host(hostname=hostname, network=network, mac=mac) print(json.dumps(str(alloc_ip), indent=2)) elif choice == '7': - ipv4addr = input("Enter IPv4 address: ") - deleted_host = delete_host_by_ip(ipv4addr=ipv4addr) + addr = input("Enter IP address of host to delete: ") + deleted_host = delete_host_by_ip(addr=addr) print(json.dumps(str(deleted_host), indent=2)) elif choice == '8':