diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py deleted file mode 100644 index 573c3aef75b79744766c52006c9e784f380442db..0000000000000000000000000000000000000000 --- a/gso/services/_ipam.py +++ /dev/null @@ -1,687 +0,0 @@ -# mypy: ignore-errors -import ipaddress -from enum import Enum -from typing import Union - -import requests -from pydantic import BaseSettings -from requests.auth import HTTPBasicAuth - -from gso import settings - - -class V4ServiceNetwork(BaseSettings): - v4: ipaddress.IPv4Network - - -class V6ServiceNetwork(BaseSettings): - v6: ipaddress.IPv6Network - - -class ServiceNetworks(BaseSettings): - v4: ipaddress.IPv4Network - v6: ipaddress.IPv6Network - - -class V4HostAddress(BaseSettings): - v4: ipaddress.IPv4Address - - -class V6HostAddress(BaseSettings): - v6: ipaddress.IPv6Address - - -class HostAddresses(BaseSettings): - v4: ipaddress.IPv4Address - v6: ipaddress.IPv6Address - - -class IPAMErrors(Enum): - # HTTP error code, match in error message - CONTAINER_FULL = 400, "Can not find requested number of networks" - NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network" - EXTATTR_UNKNOWN = 400, "Unknown extensible attribute" - EXTATTR_BADVALUE = 400, "Bad value for extensible attribute" - - -# TODO: remove this! -# lab infoblox cert is not valid for the ipv4 address -# ... disable warnings for now -requests.packages.urllib3.disable_warnings() - - -def _match_error_code(response, error_code): - return response.status_code == error_code.value[0] and error_code.value[1] in response.text - - -def _wapi(infoblox_params: settings.InfoBloxParams): - return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}" - - -def _ip_addr_version(addr): - ip_version = None - ip_addr = ipaddress.ip_address(addr) - if isinstance(ip_addr, ipaddress.IPv4Address): - ip_version = 4 - elif isinstance(ip_addr, ipaddress.IPv6Address): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def _ip_network_version(network): - ip_version = None - ip_network = ipaddress.ip_network(network) - if isinstance(ip_network, ipaddress.IPv4Network): - ip_version = 4 - elif isinstance(ip_network, ipaddress.IPv6Network): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def _assert_host_in_service( - ipv4_addr="", - ipv6_addr="", - oss_ipv4_containers=None, - oss_ipv6_containers=None, - oss_ipv4_networks=None, - oss_ipv6_networks=None, -): - # IPv4 - if oss_ipv4_containers: - assert any( - ipv4_addr in oss_ipv4_container for oss_ipv4_container in oss_ipv4_containers - ), "Host's IPv4 address doesn't belong to service type." - else: - assert any( - ipv4_addr in oss_ipv4_network for oss_ipv4_network in oss_ipv4_networks - ), "Host's IPv4 address doesn't belong to service type." - - # IPv6 - if oss_ipv6_containers: - assert any( - ipv6_addr in oss_ipv6_container for oss_ipv6_container in oss_ipv6_containers - ), "Host's IPv6 address doesn't belong to service type." - else: - assert any( - ipv6_addr in oss_ipv6_network for oss_ipv6_network in oss_ipv6_networks - ), "Host's IPv6 address doesn't belong to service type." - - -def _find_networks(network_container=None, network=None, ip_version=4): - """If network_container is not None, find all networks within the specified - container. - Otherwise, if network is not None, find the specified network. - Otherwise find all networks. - A list of all found networks is returned (an HTTP 200 code - may be returned with an empty list.). - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = "network" if ip_version == 4 else "ipv6network" - params = None - if network_container: - params = {"network_container": network_container} - elif network: - params = {"network": network} - r = requests.get( - f"{_wapi(infoblox_params)}/{endpoint}", - params=params, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def _allocate_network( - infoblox_params: settings.InfoBloxParams, - network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], - ip_version=4, - comment="", - extattrs={}, -) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - assert ip_version in [4, 6] - endpoint = "network" if ip_version == 4 else "ipv6network" - ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer" - - assert network_params.containers, ( - "No containers available to allocate networks for this service." - "Maybe you want to allocate a host from a network directly?" - ) - - # only return in the response the allocated network, not all available - # TODO: any validation needed for extrattrs wherever it's used? - req_payload = { - "network": { - "_object_function": "next_available_network", - "_parameters": {"cidr": network_params.mask}, - "_object": ip_container, - "_object_parameters": {"network": str(network_params.containers[0])}, - "_result_field": "networks", - }, - "comment": comment, - "extattrs": extattrs, - } - - container_index = 0 - while True: - r = requests.post( - f"{_wapi(infoblox_params)}/{endpoint}", - params={"_return_fields": "network"}, - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - headers={"content-type": "application/json"}, - verify=False, - ) - if not _match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): - break - # Container full: try with next valid container for service (if any) - container_index += 1 - if len(network_params.containers) < (container_index + 1): - break - req_payload["network"]["_object_parameters"]["network"] = str(network_params.containers[container_index]) - - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - assert "network" in r.json() - allocated_network = r.json()["network"] - if ip_version == 4: - return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) - else: - return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) - - -def allocate_service_ipv4_network(service_type="", comment="", extattrs={}) -> V4ServiceNetwork: - """Allocate IPv4 network within the container of the specified service type.""" - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) - - -def allocate_service_ipv6_network(service_type="", comment="", extattrs={}) -> V6ServiceNetwork: - """Allocate IPv6 network within the container of the specified service type.""" - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) - - -def _find_next_available_ip(infoblox_params, network_ref=""): - """Find the next available IP address from a network given its ref. - Returns "NETWORK_FULL" if there's no space in the network. - Otherwise returns the next available IP address in the network. - """ - r = requests.post( - f"{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", # noqa: E501 - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - - if _match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): - return "NETWORK_FULL" - - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert "ips" in r.json() - received_ip = r.json()["ips"] - assert len(received_ip) == 1 - return received_ip[0] - - -def _allocate_host( - hostname="", addrs=None, networks=None, cname_aliases=[], dns_view="default", extattrs={} -) -> Union[HostAddresses, str]: - """If networks is not None, allocate host in those networks. - Otherwise if addrs is not None, allocate host with those addresses. - hostname parameter must be full name including domain name. - Return "IPV4_NETWORK_FULL" or "IPV6_NETWORK_FULL" - if couldn't allocate host due to requested network being full. - Return "IPV4_NETWORK_NOT_FOUND" or "IPV6_NETWORK_NOT_FOUND" - if couldn't allocate host due to requested network not existing. - """ - # TODO: should hostnames be unique - # (i.e. fail if hostname already exists in this domain/service)? - assert addrs or networks, "You must specify either the host addresses or the networks CIDR." - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - if networks: - ipv4_network = networks[0] - ipv6_network = networks[1] - assert _ip_network_version(ipv4_network) == 4 - assert _ip_network_version(ipv6_network) == 6 - - # Find the next available IP address in each network - network_info = _find_networks(network=ipv4_network, ip_version=4) - if len(network_info) != 1: - return "IPV4_NETWORK_NOT_FOUND" - assert "_ref" in network_info[0] - ipv4_addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) - - network_info = _find_networks(network=ipv6_network, ip_version=6) - if len(network_info) != 1: - return "IPV6_NETWORK_NOT_FOUND" - assert "_ref" in network_info[0] - ipv6_addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) - - # If couldn't find next available IPs, return error - if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": - if ipv4_addr == "NETWORK_FULL": - return "IPV4_NETWORK_FULL" - if ipv6_addr == "NETWORK_FULL": - return "IPV6_NETWORK_FULL" - - else: - ipv4_addr = addrs[0] - ipv6_addr = addrs[1] - assert _ip_addr_version(ipv4_addr) == 4 - assert _ip_addr_version(ipv6_addr) == 6 - - req_payload = { - "ipv4addrs": [{"ipv4addr": ipv4_addr}], - "ipv6addrs": [{"ipv6addr": ipv6_addr}], - "name": hostname, - "configure_for_dns": True, - "view": dns_view, - "extattrs": extattrs, - } - - r = requests.post( - f"{_wapi(infoblox_params)}/record:host", - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert isinstance(r.json(), str) - assert r.json().startswith("record:host/") - - if cname_aliases: - cname_req_payload = {"name": "", "canonical": hostname, "view": dns_view, "extattrs": extattrs} - - for alias in cname_aliases: - cname_req_payload["name"] = alias - r = requests.post( - f"{_wapi(infoblox_params)}/record:cname", - json=cname_req_payload, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert r.json().startswith("record:cname/") - - return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) - - -def allocate_service_host( - hostname="", - service_type="", - service_networks: ServiceNetworks = None, - host_addresses: HostAddresses = None, - cname_aliases=None, - extattrs={}, -) -> HostAddresses: - """Allocate host record with both IPv4 and IPv6 address, and respective DNS - A and AAAA records. - - If service_networks is provided, and it's in a valid container, - that one is used. - - If service_networks is not provided, and host_addresses is provided, - those specific addresses are used. - - If neither is provided: - - If service has configured containers, new ipv4 and ipv6 networks are - created and those are used. Note that in this case extattrs is for the - hosts and not for the networks. - - If service doesn't have configured containers and has configured - networks instead, the configured networks are used (they are filled up - in order of appearance in the configuration file). - The domain name is taken from the service type and appended to the - specified hostname. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - domain_name = getattr(ipam_params, service_type).domain_name - dns_view = getattr(ipam_params, service_type).dns_view - - assert (oss_ipv4_containers and oss_ipv6_containers) or ( - oss_ipv4_networks and oss_ipv6_networks - ), "This service is missing either containers or networks configuration." - assert domain_name, "This service is missing domain_name configuration." - assert dns_view, "This service is missing dns_view configuration." - - if cname_aliases: - cname_aliases = [alias + domain_name for alias in cname_aliases] - - if not service_networks and not host_addresses: - if oss_ipv4_containers and oss_ipv6_containers: - # This service has configured containers. - # Use them to allocate new networks that can allocate the hosts. - - # IPv4 - ipv4_network = str(allocate_service_ipv4_network(service_type=service_type).v4) - assert ipv4_network, "No available space for IPv4 networks for this service type." - - # IPv6 - ipv6_network = str(allocate_service_ipv6_network(service_type=service_type).v6) - assert ipv6_network, "No available space for IPv6 networks for this service type." - - elif oss_ipv4_networks and oss_ipv6_networks: - # This service has configured networks. - # Allocate a host inside an ipv4 and ipv6 network from among them. - ipv4_network = str(oss_ipv4_networks[0]) - ipv6_network = str(oss_ipv6_networks[0]) - - ipv4_network_index = 0 - ipv6_network_index = 0 - while True: - network_tuple = (ipv4_network, ipv6_network) - host = _allocate_host( - hostname=hostname + domain_name, - networks=network_tuple, - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, - ) - - if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: - break - elif "IPV4" in host: - ipv4_network_index += 1 - assert oss_ipv4_networks, "No available space in any IPv4 network for this service." - assert ipv4_network_index < len( - oss_ipv4_networks - ), "No available space in any IPv4 network for this service." - ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) - else: # "IPV6" in host - ipv6_network_index += 1 - assert oss_ipv6_networks, "No available space in any IPv6 network for this service." - assert ipv6_network_index < len( - oss_ipv6_networks - ), "No available space in any IPv6 network for this service." - ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) - - elif service_networks: - # IPv4 - ipv4_network = service_networks.v4 - if oss_ipv4_containers: - assert any(ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers) - else: - assert ipv4_network in oss_ipv4_networks - - # IPv6 - ipv6_network = service_networks.v6 - if oss_ipv6_containers: - assert any(ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers) - else: - assert ipv6_network in oss_ipv6_networks - - host = _allocate_host( - hostname=hostname + domain_name, - networks=(str(ipv4_network), str(ipv6_network)), - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, - ) - assert "NETWORK_FULL" not in host, "Network is full." - assert "NETWORK_NOT_FOUND" not in host, "Network does not exist. Create it first." - - elif host_addresses: - ipv4_addr = host_addresses.v4 - ipv6_addr = host_addresses.v6 - _assert_host_in_service( - ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks - ) - - host = _allocate_host( - hostname=hostname + domain_name, - addrs=(str(ipv4_addr), str(ipv6_addr)), - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs, - ) - assert "NETWORK_FULL" not in host - - return host - - -def delete_service_network(ipnetwork=None, service_type="") -> Union[V4ServiceNetwork, V6ServiceNetwork]: - """Delete IPv4 or IPv6 network by CIDR.""" - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert ipam_params.INFOBLOX - infoblox_params = ipam_params.INFOBLOX - - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - - network = str(ipnetwork) - ip_version = _ip_network_version(network) - - # Ensure that the network to be deleted is under the service type. - # Otherwise user is not allowed to delete it - if ip_version == 4: - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - if oss_ipv4_containers: - assert any( - ipnetwork.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers - ), "Can't delete: network doesn't belong to service type." - else: - assert ipnetwork in oss_ipv4_networks, "Can't delete: network doesn't belong to service type." - - else: - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - if oss_ipv6_containers: - assert any( - ipnetwork.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers - ), "Can't delete: network doesn't belong to service type." - else: - assert ipnetwork in oss_ipv6_networks, "Can't delete: network doesn't belong to service type." - - network_info = _find_networks(network=network, ip_version=ip_version) - assert len(network_info) == 1, "Network does not exist." - assert "_ref" in network_info[0] - - r = requests.delete( - f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Extract ipv4/ipv6 address from the network reference obtained in the - # response - r_text = r.text - print(r_text) - network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) - if ip_version == 4: - return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) - else: - return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) - - -def delete_service_host( - hostname="", host_addresses: HostAddresses = None, cname_aliases=[], service_type="" -) -> Union[V4HostAddress, V6HostAddress]: - """Delete host record and associated CNAME records. - All arguments passed to this function must match together a host record in - IPAM, and all CNAME records associated to it must also be passed exactly. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert ipam_params.INFOBLOX - infoblox_params = ipam_params.INFOBLOX - - assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - domain_name = getattr(ipam_params, service_type).domain_name - dns_view = getattr(ipam_params, service_type).dns_view - ipv4_addr = str(host_addresses.v4) - ipv6_addr = str(host_addresses.v6) - - _assert_host_in_service( - host_addresses.v4, - host_addresses.v6, - oss_ipv4_containers, - oss_ipv6_containers, - oss_ipv4_networks, - oss_ipv6_networks, - ) - - # Find host record reference - r = requests.get( - f"{_wapi(infoblox_params)}/record:host", - params={ - "name": (hostname + domain_name).lower(), # hostnames are lowercase - "ipv4addr": ipv4_addr, - "ipv6addr": ipv6_addr, - "view": dns_view, - }, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - host_data = r.json() - assert len(host_data) == 1, "Host does not exist." - assert "_ref" in host_data[0] - host_ref = host_data[0]["_ref"] - - # Find cname records reference - r = requests.get( - f"{_wapi(infoblox_params)}/record:cname", - params={ - "canonical": hostname + domain_name, - "view": dns_view, - }, - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - cname_data = r.json() - provided_cnames = [item + domain_name for item in cname_aliases] - found_cnames = [item["name"] for item in cname_data if "name" in item] - assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." - - # Delete the host record - r = requests.delete( - f"{_wapi(infoblox_params)}/{host_ref}", - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Delete the CNAME records - cname_refs = [item["_ref"] for item in cname_data if "name" in item] - for cname_ref in cname_refs: - r = requests.delete( - f"{_wapi(infoblox_params)}/{cname_ref}", - auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), - verify=False, - ) - assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - return host_addresses - - -""" -Below methods are not used for supported outside calls -""" - -''' -def _find_containers(network=None, ip_version=4): - """ - If network is not None, find that container. - Otherwise find all containers. - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = 'networkcontainer' if ip_version == 4 \ - else 'ipv6networkcontainer' - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={'network': network} if network else None, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def _get_network_capacity(network=None): - """ - Get utilization of a IPv4 network in a fraction of 1000. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - assert ip_version == 4, "Utilization is only available for IPv4 networks." - params = { - 'network': network, - '_return_fields': 'network,total_hosts,utilization' - } - - r = requests.get( - f'{_wapi(infoblox_params)}/network', - params=params, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - # Utilization info takes several minutes to converge. - # The IPAM utilization bar in the GUI as well. Why? - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - capacity_info = r.json() - assert len(capacity_info) == 1, "Requested IPv4 network doesn't exist." - assert 'utilization' in capacity_info[0] - utilization = capacity_info[0]['utilization'] - return utilization - - -def _get_network_usage_status(network): - """ - Get status and usage fields of all hosts in the specified ipv4 or ipv6 - network. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - endpoint = 'ipv4address' if ip_version == 4 else 'ipv6address' - - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={ - '_return_fields': 'ip_address,status,usage', - 'network': network}, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() -''' diff --git a/gso/services/ipam.py b/gso/services/ipam.py index e7018312057b96c03d197a9909624730a1007a4d..a97994d0a022588440319964a666205d351e4e0c 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -1,11 +1,13 @@ # mypy: ignore-errors import ipaddress +from enum import Enum from typing import Optional, Union +import requests from pydantic import BaseSettings +from requests.auth import HTTPBasicAuth -from gso.services import _ipam -from gso.services._ipam import V4HostAddress, V6HostAddress +from gso import settings class V4ServiceNetwork(BaseSettings): @@ -21,18 +23,588 @@ class ServiceNetworks(BaseSettings): v6: ipaddress.IPv6Network +class V4HostAddress(BaseSettings): + v4: ipaddress.IPv4Address + + +class V6HostAddress(BaseSettings): + v6: ipaddress.IPv6Address + + class HostAddresses(BaseSettings): v4: ipaddress.IPv4Address v6: ipaddress.IPv6Address +class IPAMErrors(Enum): + # HTTP error code, match in error message + CONTAINER_FULL = 400, "Can not find requested number of networks" + NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network" + + +# TODO: remove this! +# lab infoblox cert is not valid for the ipv4 address +# ... disable warnings for now +requests.packages.urllib3.disable_warnings() + + +def match_error_code(response, error_code): + return response.status_code == error_code.value[0] and error_code.value[1] in response.text + + +def wapi(infoblox_params: settings.InfoBloxParams): + return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}" + + +def ip_addr_version(addr): + ip_version = None + ip_addr = ipaddress.ip_address(addr) + if isinstance(ip_addr, ipaddress.IPv4Address): + ip_version = 4 + elif isinstance(ip_addr, ipaddress.IPv6Address): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + + +def ip_network_version(network): + ip_version = None + ip_network = ipaddress.ip_network(network) + if isinstance(ip_network, ipaddress.IPv4Network): + ip_version = 4 + elif isinstance(ip_network, ipaddress.IPv6Network): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + + +def assert_host_in_service( + ipv4_addr="", + ipv6_addr="", + oss_ipv4_containers=None, + oss_ipv6_containers=None, + oss_ipv4_networks=None, + oss_ipv6_networks=None, +): + # IPv4 + if oss_ipv4_containers: + assert any( + ipv4_addr in oss_ipv4_container for oss_ipv4_container in oss_ipv4_containers + ), "Host's IPv4 address doesn't belong to service type." + else: + assert any( + ipv4_addr in oss_ipv4_network for oss_ipv4_network in oss_ipv4_networks + ), "Host's IPv4 address doesn't belong to service type." + + # IPv6 + if oss_ipv6_containers: + assert any( + ipv6_addr in oss_ipv6_container for oss_ipv6_container in oss_ipv6_containers + ), "Host's IPv6 address doesn't belong to service type." + else: + assert any( + ipv6_addr in oss_ipv6_network for oss_ipv6_network in oss_ipv6_networks + ), "Host's IPv6 address doesn't belong to service type." + + +def find_networks(network_container=None, network=None, ip_version=4): + """If network_container is not None, find all networks within the specified + container. + Otherwise, if network is not None, find the specified network. + Otherwise find all networks. + A list of all found networks is returned (an HTTP 200 code + may be returned with an empty list.). + """ + assert ip_version in [4, 6] + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + endpoint = "network" if ip_version == 4 else "ipv6network" + params = None + if network_container: + params = {"network_container": network_container} + elif network: + params = {"network": network} + r = requests.get( + f"{wapi(infoblox_params)}/{endpoint}", + params=params, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + + +def allocate_network( + infoblox_params: settings.InfoBloxParams, + network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], + ip_version=4, + comment="", + extattrs={}, +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + assert ip_version in [4, 6] + endpoint = "network" if ip_version == 4 else "ipv6network" + ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer" + + assert network_params.containers, ( + "No containers available to allocate networks for this service." + "Maybe you want to allocate a host from a network directly?" + ) + + # only return in the response the allocated network, not all available + # TODO: any validation needed for extrattrs wherever it's used? + req_payload = { + "network": { + "_object_function": "next_available_network", + "_parameters": {"cidr": network_params.mask}, + "_object": ip_container, + "_object_parameters": {"network": str(network_params.containers[0])}, + "_result_field": "networks", + }, + "comment": comment, + "extattrs": extattrs, + } + + container_index = 0 + while True: + r = requests.post( + f"{wapi(infoblox_params)}/{endpoint}", + params={"_return_fields": "network"}, + json=req_payload, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + headers={"content-type": "application/json"}, + verify=False, + ) + if not match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): + break + # Container full: try with next valid container for service (if any) + container_index += 1 + if len(network_params.containers) < (container_index + 1): + break + req_payload["network"]["_object_parameters"]["network"] = str(network_params.containers[container_index]) + + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + assert "network" in r.json() + allocated_network = r.json()["network"] + if ip_version == 4: + return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) + else: + return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) + + +def allocate_service_ipv4_network(service_type="", comment="", extattrs={}) -> V4ServiceNetwork: + """Allocate IPv4 network within the container of the specified service type.""" + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + return allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) + + +def allocate_service_ipv6_network(service_type="", comment="", extattrs={}) -> V6ServiceNetwork: + """Allocate IPv6 network within the container of the specified service type.""" + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + return allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) + + +def find_next_available_ip(infoblox_params, network_ref=""): + """Find the next available IP address from a network given its ref. + Returns "NETWORK_FULL" if there's no space in the network. + Otherwise returns the next available IP address in the network. + """ + r = requests.post( + f"{wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", # noqa: E501 + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + + if match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): + return "NETWORK_FULL" + + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert "ips" in r.json() + received_ip = r.json()["ips"] + assert len(received_ip) == 1 + return received_ip[0] + + +def allocate_host( + hostname="", addrs=None, networks=None, cname_aliases=[], dns_view="default", extattrs={} +) -> Union[HostAddresses, str]: + """If networks is not None, allocate host in those networks. + Otherwise if addrs is not None, allocate host with those addresses. + hostname parameter must be full name including domain name. + Return "IPV4_NETWORK_FULL" or "IPV6_NETWORK_FULL" + if couldn't allocate host due to requested network being full. + Return "IPV4_NETWORK_NOT_FOUND" or "IPV6_NETWORK_NOT_FOUND" + if couldn't allocate host due to requested network not existing. + """ + # TODO: should hostnames be unique + # (i.e. fail if hostname already exists in this domain/service)? + assert addrs or networks, "You must specify either the host addresses or the networks CIDR." + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + if networks: + ipv4_network = networks[0] + ipv6_network = networks[1] + assert ip_network_version(ipv4_network) == 4 + assert ip_network_version(ipv6_network) == 6 + + # Find the next available IP address in each network + network_info = find_networks(network=ipv4_network, ip_version=4) + if len(network_info) != 1: + return "IPV4_NETWORK_NOT_FOUND" + assert "_ref" in network_info[0] + ipv4_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"]) + + network_info = find_networks(network=ipv6_network, ip_version=6) + if len(network_info) != 1: + return "IPV6_NETWORK_NOT_FOUND" + assert "_ref" in network_info[0] + ipv6_addr = find_next_available_ip(infoblox_params, network_info[0]["_ref"]) + + # If couldn't find next available IPs, return error + if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": + if ipv4_addr == "NETWORK_FULL": + return "IPV4_NETWORK_FULL" + if ipv6_addr == "NETWORK_FULL": + return "IPV6_NETWORK_FULL" + + else: + ipv4_addr = addrs[0] + ipv6_addr = addrs[1] + assert ip_addr_version(ipv4_addr) == 4 + assert ip_addr_version(ipv6_addr) == 6 + + req_payload = { + "ipv4addrs": [{"ipv4addr": ipv4_addr}], + "ipv6addrs": [{"ipv6addr": ipv6_addr}], + "name": hostname, + "configure_for_dns": True, + "view": dns_view, + "extattrs": extattrs, + } + + r = requests.post( + f"{wapi(infoblox_params)}/record:host", + json=req_payload, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert isinstance(r.json(), str) + assert r.json().startswith("record:host/") + + if cname_aliases: + cname_req_payload = {"name": "", "canonical": hostname, "view": dns_view, "extattrs": extattrs} + + for alias in cname_aliases: + cname_req_payload["name"] = alias + r = requests.post( + f"{wapi(infoblox_params)}/record:cname", + json=cname_req_payload, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.json().startswith("record:cname/") + + return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) + + +def allocate_service_host( + hostname="", + service_type="", + service_networks: ServiceNetworks = None, + host_addresses: HostAddresses = None, + cname_aliases=None, + extattrs={}, +) -> HostAddresses: + """Allocate host record with both IPv4 and IPv6 address, and respective DNS + A and AAAA records. + - If service_networks is provided, and it's in a valid container, + that one is used. + - If service_networks is not provided, and host_addresses is provided, + those specific addresses are used. + - If neither is provided: + - If service has configured containers, new ipv4 and ipv6 networks are + created and those are used. Note that in this case extattrs is for the + hosts and not for the networks. + - If service doesn't have configured containers and has configured + networks instead, the configured networks are used (they are filled up + in order of appearance in the configuration file). + The domain name is taken from the service type and appended to the + specified hostname. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + domain_name = getattr(ipam_params, service_type).domain_name + dns_view = getattr(ipam_params, service_type).dns_view + + assert (oss_ipv4_containers and oss_ipv6_containers) or ( + oss_ipv4_networks and oss_ipv6_networks + ), "This service is missing either containers or networks configuration." + assert domain_name, "This service is missing domain_name configuration." + assert dns_view, "This service is missing dns_view configuration." + + if cname_aliases: + cname_aliases = [alias + domain_name for alias in cname_aliases] + + if not service_networks and not host_addresses: + if oss_ipv4_containers and oss_ipv6_containers: + # This service has configured containers. + # Use them to allocate new networks that can allocate the hosts. + + # IPv4 + ipv4_network = str(allocate_service_ipv4_network(service_type=service_type).v4) + assert ipv4_network, "No available space for IPv4 networks for this service type." + + # IPv6 + ipv6_network = str(allocate_service_ipv6_network(service_type=service_type).v6) + assert ipv6_network, "No available space for IPv6 networks for this service type." + + elif oss_ipv4_networks and oss_ipv6_networks: + # This service has configured networks. + # Allocate a host inside an ipv4 and ipv6 network from among them. + ipv4_network = str(oss_ipv4_networks[0]) + ipv6_network = str(oss_ipv6_networks[0]) + + ipv4_network_index = 0 + ipv6_network_index = 0 + while True: + network_tuple = (ipv4_network, ipv6_network) + host = allocate_host( + hostname=hostname + domain_name, + networks=network_tuple, + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs, + ) + + if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: + break + elif "IPV4" in host: + ipv4_network_index += 1 + assert oss_ipv4_networks, "No available space in any IPv4 network for this service." + assert ipv4_network_index < len( + oss_ipv4_networks + ), "No available space in any IPv4 network for this service." + ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) + else: # "IPV6" in host + ipv6_network_index += 1 + assert oss_ipv6_networks, "No available space in any IPv6 network for this service." + assert ipv6_network_index < len( + oss_ipv6_networks + ), "No available space in any IPv6 network for this service." + ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) + + elif service_networks: + # IPv4 + ipv4_network = service_networks.v4 + if oss_ipv4_containers: + assert any(ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers) + else: + assert ipv4_network in oss_ipv4_networks + + # IPv6 + ipv6_network = service_networks.v6 + if oss_ipv6_containers: + assert any(ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers) + else: + assert ipv6_network in oss_ipv6_networks + + host = allocate_host( + hostname=hostname + domain_name, + networks=(str(ipv4_network), str(ipv6_network)), + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs, + ) + assert "NETWORK_FULL" not in host, "Network is full." + assert "NETWORK_NOT_FOUND" not in host, "Network does not exist. Create it first." + + elif host_addresses: + ipv4_addr = host_addresses.v4 + ipv6_addr = host_addresses.v6 + assert_host_in_service( + ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks + ) + + host = allocate_host( + hostname=hostname + domain_name, + addrs=(str(ipv4_addr), str(ipv6_addr)), + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs, + ) + assert "NETWORK_FULL" not in host + + return host + + +def delete_service_network( + network: Union[V4ServiceNetwork, V6ServiceNetwork] = None, service_type="" +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + """Delete IPv4 or IPv6 network by CIDR.""" + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + + ip_version = ip_network_version(str(network)) + + # Ensure that the network to be deleted is under the service type. + # Otherwise user is not allowed to delete it + if ip_version == 4: + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + if oss_ipv4_containers: + assert any( + network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers + ), "Can't delete: network doesn't belong to service type." + else: + assert network in oss_ipv4_networks, "Can't delete: network doesn't belong to service type." + + else: + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + if oss_ipv6_containers: + assert any( + network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers + ), "Can't delete: network doesn't belong to service type." + else: + assert network in oss_ipv6_networks, "Can't delete: network doesn't belong to service type." + + network_info = find_networks(network=str(network), ip_version=ip_version) + assert len(network_info) == 1, "Network does not exist." + assert "_ref" in network_info[0] + + r = requests.delete( + f'{wapi(infoblox_params)}/{network_info[0]["_ref"]}', + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Extract ipv4/ipv6 address from the network reference obtained in the + # response + r_text = r.text + print(r_text) + network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) + if ip_version == 4: + return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) + else: + return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) + + +def delete_service_host( + hostname="", host_addresses: HostAddresses = None, cname_aliases=[], service_type="" +) -> Union[V4HostAddress, V6HostAddress]: + """Delete host record and associated CNAME records. + All arguments passed to this function must match together a host record in + IPAM, and all CNAME records associated to it must also be passed exactly. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + domain_name = getattr(ipam_params, service_type).domain_name + dns_view = getattr(ipam_params, service_type).dns_view + ipv4_addr = str(host_addresses.v4) + ipv6_addr = str(host_addresses.v6) + + assert_host_in_service( + host_addresses.v4, + host_addresses.v6, + oss_ipv4_containers, + oss_ipv6_containers, + oss_ipv4_networks, + oss_ipv6_networks, + ) + + # Find host record reference + r = requests.get( + f"{wapi(infoblox_params)}/record:host", + params={ + "name": (hostname + domain_name).lower(), # hostnames are lowercase + "ipv4addr": ipv4_addr, + "ipv6addr": ipv6_addr, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + host_data = r.json() + assert len(host_data) == 1, "Host does not exist." + assert "_ref" in host_data[0] + host_ref = host_data[0]["_ref"] + + # Find cname records reference + r = requests.get( + f"{wapi(infoblox_params)}/record:cname", + params={ + "canonical": hostname + domain_name, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + cname_data = r.json() + provided_cnames = [item + domain_name for item in cname_aliases] + found_cnames = [item["name"] for item in cname_data if "name" in item] + assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." + + # Delete the host record + r = requests.delete( + f"{wapi(infoblox_params)}/{host_ref}", + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Delete the CNAME records + cname_refs = [item["_ref"] for item in cname_data if "name" in item] + for cname_ref in cname_refs: + r = requests.delete( + f"{wapi(infoblox_params)}/{cname_ref}", + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, + ) + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + return host_addresses + + def new_service_networks(service_type: str = "", comment: str = "", extattrs: dict = None) -> ServiceNetworks: if extattrs is None: extattrs = {} - v4_service_network = _ipam.allocate_service_ipv4_network( + v4_service_network = allocate_service_ipv4_network( service_type=service_type, comment=comment, extattrs=extattrs ) - v6_service_network = _ipam.allocate_service_ipv6_network( + v6_service_network = allocate_service_ipv6_network( service_type=service_type, comment=comment, extattrs=extattrs ) return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) @@ -48,7 +620,7 @@ def new_service_host( ) -> HostAddresses: if extattrs is None: extattrs = {} - return _ipam.allocate_service_host( + return allocate_service_host( hostname=hostname, service_type=service_type, service_networks=service_networks, @@ -56,19 +628,3 @@ def new_service_host( cname_aliases=cname_aliases, extattrs=extattrs, ) - - -def delete_service_network( - network: ipaddress.ip_network = None, service_type: str = "" -) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - return _ipam.delete_service_network(ipnetwork=network, service_type=service_type) - - -def delete_service_host( - hostname: str = "", host_addresses: HostAddresses = None, cname_aliases=None, service_type: str = "" -) -> V4HostAddress | V6HostAddress: - if cname_aliases is None: - cname_aliases = [] - return _ipam.delete_service_host( - hostname=hostname, host_addresses=host_addresses, cname_aliases=cname_aliases, service_type=service_type - )