diff --git a/.gitignore b/.gitignore index 20969d16f10cfe87950112c3dfb1a3a00e4ef5fb..af56c6d634b0fe79fee18e851f5f3163bceced53 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ __pycache__/ .coverage coverage.xml .tox/device_vendor -.vscode \ No newline at end of file +.vscode +oss-params.json diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py new file mode 100644 index 0000000000000000000000000000000000000000..64a80f998bcf9581df20a5fef7931a16dc5b552f --- /dev/null +++ b/gso/services/_ipam.py @@ -0,0 +1,277 @@ +import requests +from requests.auth import HTTPBasicAuth +import json +import ipaddress +from pydantic import BaseSettings +from typing import Union + +from gso import settings + + +class V4ServiceNetwork(BaseSettings): + v4: ipaddress.IPv4Network + + +class V6ServiceNetwork(BaseSettings): + v6: ipaddress.IPv6Network + + +class ServiceNetworks(BaseSettings): + v4: V4ServiceNetwork + v6: V6ServiceNetwork + + +class V4HostAddress(BaseSettings): + v4: ipaddress.IPv4Address + + +class V6HostAddress(BaseSettings): + v6: ipaddress.IPv6Address + + +class HostAddresses(BaseSettings): + v4: V4HostAddress + v6: V6HostAddress + + +# TODO: remove this! +# lab infoblox cert is not valid for the ipv4 address +# ... disable warnings for now +requests.packages.urllib3.disable_warnings() + + +def _wapi(infoblox_params: settings.InfoBloxParams): + return (f'https://{infoblox_params.host}' + f'/wapi/{infoblox_params.wapi_version}') + +def find_containers(network=None): + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + r = requests.get( + f'{_wapi(infoblox_params)}/networkcontainer', + params={'network': network} if network else None, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + + +def find_networks(network=None): + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + r = requests.get( + f'{_wapi(infoblox_params)}/network', + params={'network': network} if network else None, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + +def _allocate_network(infoblox_params: settings.InfoBloxParams, network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], ip_version=4): + assert ip_version in [4, 6] + + _req_payload = { + "network": { + "_object_function": "next_available_network", + "_parameters": { + "cidr": network_params.mask + }, + "_object": "networkcontainer", + "_object_parameters": { + "network": str(network_params.container) + }, + "_result_field": "networks", + } + } + + r = requests.post( + f'{_wapi(infoblox_params)}/network', + params={'_return_fields': 'network'}, + json=_req_payload, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + headers={'content-type': "application/json"}, + verify=False) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + allocated_network = r.json()['network'] + if ip_version==4: + return V4ServiceNetwork(v4=allocated_network) + else: + return V6ServiceNetwork(v6=allocated_network) + +def allocate_service_ipv4_network(service_type): + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + # sanity: verify the service_type is a proper one + assert hasattr(ipam_params, service_type) and service_type != 'INFOBLOX' + return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4) + +def allocate_service_ipv6_network(service_type): + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + # sanity: verify the service_type is a proper one + assert hasattr(ipam_params, service_type) and service_type != 'INFOBLOX' + return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6) + +def delete_network(network): + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + network_info = find_networks(network=network) + assert len(network_info) == 1, "Network does not exist." + assert '_ref' in network_info[0] + + r = requests.delete( + f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Extract ipv4/ipv6 address from the network reference obtained in the response + r_json = r.json() + network_address = ipaddress.ip_network(r_json.rsplit("/", 1)[0].split(":")[1]) + if isinstance(network_address, ipaddress.IPv4Network): + return V4ServiceNetwork(v4=network_address) + elif isinstance(network_address, ipaddress.IPv6Network): + return V6ServiceNetwork(v6=network_address) + +def _find_next_available_ip(infoblox_params, network_ref): + r = requests.post( + f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1', + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert 'ips' in r.json() + received_ip = r.json()['ips'] + assert len(received_ip) == 1 + return received_ip[0] + +def allocate_host(mac=None, hostname=None, ipv4addr=None, network_cidr=None): + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + if network_cidr: + # Find the next available IP address in the network + network_info = find_networks(network=network_cidr) + assert len(network_info) == 1, "Network does not exist. Create it first." + assert '_ref' in network_info[0] + ipv4addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) + + _req_payload = { + "ipv4addrs": [ + { + "ipv4addr": ipv4addr, + "mac": mac + } + ], + "name": hostname, + "configure_for_dns": False, + "view": "default" + } + + r = requests.post( + f'{_wapi(infoblox_params)}/record:host', + json=_req_payload, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert isinstance(r.json(), str) + assert r.json().startswith("record:host/") + + return V4HostAddress(v4=ipv4addr) + +def delete_host_by_ip(ipv4addr): + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + # Find host record reference + r = requests.get( + f'{_wapi(infoblox_params)}/record:host', + params={'ipv4addr': ipv4addr}, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False) + host_data = r.json() + assert len(host_data) == 1, "Host does not exist." + assert '_ref' in host_data[0] + host_ref = host_data[0]['_ref'] + + # Delete it + r = requests.delete( + f'{_wapi(infoblox_params)}/{host_ref}', + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False) + r.raise_for_status() + return V4HostAddress(v4=ipv4addr) + + +if __name__ == '__main__': + while True: + print("1. Find all containers") + print("2. Find all networks") + print("3. Create new network") + print("4. Delete network") + print("5. Allocate host by IP") + print("6. Allocate host by network CIDR") + print("7. Delete host by IP") + print("8. Exit") + + choice = input("Enter your choice: ") + + if choice == '1': + containers = find_containers() + print(json.dumps(containers, indent=2)) + + elif choice == '2': + all_networks = find_networks() + print(json.dumps(all_networks, indent=2)) + + elif choice == '3': + service_type = input("Enter service type: ") + ip_version = int(input("Enter IP version (4 or 6): ")) + + if ip_version == 4: + new_network = allocate_service_ipv4_network(service_type=service_type) + elif ip_version == 6: + new_network = allocate_service_ipv6_network(service_type=service_type) + else: + print("Invalid IP version. Please enter either 4 or 6.") + continue + + print(json.dumps(str(new_network), indent=2)) + + elif choice == '4': + network = input("Enter network to delete (in CIDR notation): ") + deleted_network = delete_network(network=network) + print(json.dumps(str(deleted_network), indent=2)) + + elif choice == '5': + hostname = input("Enter host name: ") + ipv4addr = input("Enter IPv4 address to allocate: ") + mac = input("Enter mac address: ") + alloc_ip = allocate_host(hostname=hostname, ipv4addr=ipv4addr, mac=mac) + print(json.dumps(str(alloc_ip), indent=2)) + + elif choice == '6': + hostname = input("Enter host name: ") + network_cidr = input("Enter an existing network CIDR to allocate from: ") + mac = input("Enter mac address: ") + alloc_ip = allocate_host(hostname=hostname, network_cidr=network_cidr, mac=mac) + print(json.dumps(str(alloc_ip), indent=2)) + + elif choice == '7': + ipv4addr = input("Enter IPv4 address: ") + deleted_host = delete_host_by_ip(ipv4addr=ipv4addr) + print(json.dumps(str(deleted_host), indent=2)) + + elif choice == '8': + print("Exiting...") + break + + else: + print("Invalid choice. Please try again.")