diff --git a/gso/oss-params-example.json b/gso/oss-params-example.json index 5f533ef63b3ef1e319a8a49cbc294077a5614733..9c48d63db1dd48d0fbc489b8260aae536ee3b641 100644 --- a/gso/oss-params-example.json +++ b/gso/oss-params-example.json @@ -12,87 +12,34 @@ "password": "robot-user-password" }, "LO": { - "V4": { - "containers": [ - "1.1.0.0/24" - ], - "networks": [], - "mask": 32 - }, - "V6": { - "containers": [ - "dead:beef::/64" - ], - "networks": [], - "mask": 128 - }, - "domain_name": ".lo" + "V4": {"containers": [], "networks": ["1.1.0.0/24"], "mask": 0}, + "V6": {"containers": [], "networks": ["dead:beef::/64"], "mask": 0}, + "domain_name": ".lo", + "dns_view": "default" }, "TRUNK": { - "V4": { - "containers": [ - "1.1.1.0/24" - ], - "networks": [], - "mask": 31 - }, - "V6": { - "containers": [ - "dead:beef::/64" - ], - "networks": [], - "mask": 126 - }, - "domain_name": ".trunk" + "V4": {"containers": ["1.1.1.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126}, + "domain_name": ".trunk", + "dns_view": "default" }, "GEANT_IP": { - "V4": { - "containers": [ - "1.1.2.0/24" - ], - "networks": [], - "mask": 31 - }, - "V6": { - "containers": [ - "dead:beef::/64" - ], - "networks": [], - "mask": 126 - }, - "domain_name": ".geantip" + "V4": {"containers": ["1.1.2.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126}, + "domain_name": ".geantip", + "dns_view": "default" }, "SI": { - "V4": { - "containers": [ - "10.255.253.128/25" - ], - "networks": [], - "mask": 31 - }, - "V6": { - "containers": [], - "networks": [], - "mask": 126 - }, - "domain_name": ".geantip" + "V4": {"containers": ["1.1.3.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126}, + "domain_name": ".si", + "dns_view": "default" }, "LT_IAS": { - "V4": { - "containers": [ - "10.255.255.0/24" - ], - "networks": [], - "mask": 31 - }, - "V6": { - "containers": [ - "dead:beef:cc::/48" - ], - "networks": [], - "mask": 126 - }, - "domain_name": ".geantip" + "V4": {"containers": ["1.1.4.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126}, + "domain_name": ".ltias", + "dns_view": "default" } }, "PROVISIONING_PROXY": { diff --git a/gso/services/README.md b/gso/services/README.md new file mode 100644 index 0000000000000000000000000000000000000000..741fa16bf2e01e359d94cedb2634c0551f6d4b8c --- /dev/null +++ b/gso/services/README.md @@ -0,0 +1,114 @@ +## IPAM + +### Example configuration +The following kind of configuration file needs to be exported in the variable `OSS_PARAMS_FILENAME`. + +``` + "LO": { + "V4": {"containers": [], "networks": ["10.255.255.32/32", "10.255.255.0/28", "10.255.255.16/28"], "mask": 0}, + "V6": {"containers": [], "networks": ["dead:beef::/80", "dead:beef:0:1::/80"], "mask": 0}, + "domain_name": ".gso" + }, + "TRUNK": { + "V4": {"containers": ["10.255.255.0/24", "10.255.254.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64", "dead:beee::/64"], "networks": [], "mask": 126}, + "domain_name": ".gso" + }, +``` +Either a non-empty list of `networks` or a non-empty list of `containers` is required in the params file per service type. Having both is redundant, but in that case containers take precedence. The `mask` parameter is irrelevant if the service type uses networks instead of containers. + +### Host/network allocation +Hosts can be allocated through `ipam.new_service_host()` resulting in a host record (IPv4, IPv6, A, AAAA) and zero or more cname records. This function can be called by passing domain name, hostname, and service type (mandatory). Optinally, you can pass either ipv4/ipv6 addresses, ipv4/ipv6 networks, or nothing. Passing nothing is considered default. In that case: + +- If the service type has any container specified in the params file, networks have to be allocated before allocating hosts (through `ipam.new_service_networks()`). The mask parameter is used to create the new networks. Containers are filled up in the order that they appear in the params file. +- If the service type has any network specified in the params file, those networks are used directly to allocate hosts. Networks are filled up in the order that they appear in the params file. + +If you pass addresses or networks, the module will always attempt to use those, but will fail if they don't match the configuration in the params file for the service type (i.e. if you request an address or network that places outside of the configured containers/networks). + +Networks and hosts can be allocated with extensible attributes. Networks can be created with a comment. CNAME records can be optionally created. + +### Host/network deletion +The code checks that the resource you are trying to delete is allocated (in terms of IP address space) to the service as per `containers` or `networks` in the service's configuration. If not, you are not allowed to delete it. + +No host or cname records are deleted via the `delete_service_host()` function until all passed arguments match the hostname, IP, and CNAME data of the host. If anything doesn't match, no record is deleted. + +### Usage examples + +#### Host/network allocation + The following is a sample call flow to allocate two loopback interfaces and a trunk service. It assumes the _Example configuration_ from the above section is used, where TRUNK has configured containers, and LO has configured networks. + +In this example: + - Host hA for service LO uses specific ipv4/ipv6 address pair. + - Host hB for service LO uses nothing (just the service type). + - Host hA for service TRUNK uses a specific ipv4/ipv6 address pair. + - Host hB for service TRUNK uses the ipv4/ipv6 network pair. + + Because TRUNK has configured containers rather than networks, `new_service_networks()` must be called first to create a network for the hA-hB TRUNK. This is not needed for LO, which automatically uses the first configured network with available space to allocate new hosts. + +``` + hostname_A = 'hA' + hostname_B = 'hB' + + # hA LO (loopback) + loA_v4_host_address = ipaddress.ip_address('10.255.255.0') + loA_v6_host_address = ipaddress.ip_address('dead:beef::0') + loA_host_addresses = HostAddresses(v4=loA_v4_host_address, + v6=loA_v6_host_address) + new_service_host(hostname=hostname_A+"_LO", + host_addresses=loA_host_addresses, + cname_aliases=["alias1.hA", "alias2.hA"], + service_type='LO') + + # hB LO (loopback) + new_service_host(hostname=hostname_B+"_LO", + cname_aliases=["alias1.hB"], + service_type='LO') + + # hA-hB TRUNK + trunkAB_network_extattrs = { + "vrf_name": {"value": "dummy_vrf"}, + } + trunkAB_host_extattrs = { + "Site": {"value": "dummy_site"}, + } + trunkAB_service_networks = new_service_networks( + service_type='TRUNK', + extattrs=trunkAB_network_extattrs, + comment="Network for hA-hB TRUNK" + ) + + trunkAB_v4_host_address = trunkAB_service_networks.v4.network_address + trunkAB_v6_host_address = trunkAB_service_networks.v6.network_address + trunkAB_host_addresses = HostAddresses(v4=trunkAB_v4_host_address, + v6=trunkAB_v6_host_address) + + new_service_host(hostname=hostname_A+"_TRUNK", + service_type='TRUNK', + host_addresses=trunkAB_host_addresses, + extattrs=trunkAB_host_extattrs) + + new_service_host(hostname=hostname_B+"_TRUNK", + service_type='TRUNK', + service_networks=trunkAB_service_networks, + extattrs=trunkAB_host_extattrs) +``` + +#### Host/network deletion +``` + # Delete network + service_network = ipam.delete_service_network( + network=ipaddress.ip_network('10.255.255.0/26'), service_type='LO' + ) + + # Delete host + input_host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address('10.255.255.1'), + v6=ipaddress.ip_address('dead:beef::1') + ) + host_addresses = ipam.delete_service_host( + hostname='ha_lo', + host_addresses=input_host_addresses, + cname_aliases=['alias1.ha', 'alias2.ha'], + service_type='LO' + ) +``` diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index 42bc3d7004acd687c95798e86c5cb02b5c8187df..dc7df6837e91175f5cff54d4d94d48040bb215a7 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -1,678 +1,770 @@ -import ipaddress -import requests -from enum import Enum -from pydantic import BaseSettings -from requests.auth import HTTPBasicAuth -from typing import Union - -from gso import settings - - -class V4ServiceNetwork(BaseSettings): - v4: ipaddress.IPv4Network - - -class V6ServiceNetwork(BaseSettings): - v6: ipaddress.IPv6Network - - -class ServiceNetworks(BaseSettings): - v4: ipaddress.IPv4Network - v6: ipaddress.IPv6Network - - -class V4HostAddress(BaseSettings): - v4: ipaddress.IPv4Address - - -class V6HostAddress(BaseSettings): - v6: ipaddress.IPv6Address - - -class HostAddresses(BaseSettings): - v4: ipaddress.IPv4Address - v6: ipaddress.IPv6Address - - -class IPAMErrors(Enum): - # HTTP error code, match in error message - CONTAINER_FULL = 400, "Can not find requested number of networks" - NETWORK_FULL = 400, \ - "Cannot find 1 available IP address(es) in this network" - EXTATTR_UNKNOWN = 400, "Unknown extensible attribute" - EXTATTR_BADVALUE = 400, "Bad value for extensible attribute" - - -# TODO: remove this! -# lab infoblox cert is not valid for the ipv4 address -# ... disable warnings for now -requests.packages.urllib3.disable_warnings() - - -def _match_error_code(response, error_code): - return response.status_code == error_code.value[0] \ - and error_code.value[1] in response.text - - -def _wapi(infoblox_params: settings.InfoBloxParams): - return (f'https://{infoblox_params.host}' - f'/wapi/{infoblox_params.wapi_version}') - - -def _ip_addr_version(addr): - ip_version = None - ip_addr = ipaddress.ip_address(addr) - if isinstance(ip_addr, ipaddress.IPv4Address): - ip_version = 4 - elif isinstance(ip_addr, ipaddress.IPv6Address): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def _ip_network_version(network): - ip_version = None - ip_network = ipaddress.ip_network(network) - if isinstance(ip_network, ipaddress.IPv4Network): - ip_version = 4 - elif isinstance(ip_network, ipaddress.IPv6Network): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def _find_networks(network_container=None, network=None, ip_version=4): - """ - If network_container is not None, find all networks within the specified - container. - Otherwise, if network is not None, find the specified network. - Otherwise find all networks. - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = 'network' if ip_version == 4 else 'ipv6network' - params = None - if network_container: - params = {'network_container': network_container} - elif network: - params = {'network': network} - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params=params, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - # TODO: propagate "network not found" error to caller - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def _allocate_network( - infoblox_params: settings.InfoBloxParams, - network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], - ip_version=4, - comment="", - extattrs={} -) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - assert ip_version in [4, 6] - endpoint = 'network' if ip_version == 4 else 'ipv6network' - ip_container = 'networkcontainer' if ip_version == 4 else \ - 'ipv6networkcontainer' - - assert network_params.containers, \ - "No containers available to allocate networks for this service." \ - "Maybe you want to allocate a host from a network directly?" - - # only return in the response the allocated network, not all available - # TODO: any validation needed for extrattrs wherever it's used? - req_payload = { - "network": { - "_object_function": "next_available_network", - "_parameters": { - "cidr": network_params.mask - }, - "_object": ip_container, - "_object_parameters": { - "network": str(network_params.containers[0]) - }, - "_result_field": "networks", - }, - "comment": comment, - "extattrs": extattrs - } - - container_index = 0 - while True: - r = requests.post( - f'{_wapi(infoblox_params)}/{endpoint}', - params={'_return_fields': 'network'}, - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - headers={'content-type': "application/json"}, - verify=False - ) - if not _match_error_code(response=r, - error_code=IPAMErrors.CONTAINER_FULL): - break - # Container full: try with next valid container for service (if any) - container_index += 1 - if len(network_params.containers) < (container_index + 1): - break - req_payload["network"]["_object_parameters"]["network"] = \ - str(network_params.containers[container_index]) - - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - assert 'network' in r.json() - allocated_network = r.json()['network'] - if ip_version == 4: - return V4ServiceNetwork(v4=allocated_network) - else: - return V6ServiceNetwork(v6=allocated_network) - - -def allocate_service_ipv4_network(service_type, comment="", extattrs={} - ) -> V4ServiceNetwork: - """ - Allocate IPv4 network within the container of the specified service type. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, - getattr(ipam_params, service_type).V4, - 4, - comment, - extattrs) - - -def allocate_service_ipv6_network(service_type, comment="", extattrs={} - ) -> V6ServiceNetwork: - """ - Allocate IPv6 network within the container of the specified service type. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, - getattr(ipam_params, service_type).V6, - 6, - comment, - extattrs) - - -def _find_next_available_ip(infoblox_params, network_ref): - """ - Find the next available IP address from a network given its ref. - Returns "NETWORK_FULL" if there's no space in the network. - Otherwise returns the next available IP address in the network. - """ - r = requests.post( - f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1', # noqa: E501 - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - - if _match_error_code(response=r, - error_code=IPAMErrors.NETWORK_FULL): - return "NETWORK_FULL" - - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert 'ips' in r.json() - received_ip = r.json()['ips'] - assert len(received_ip) == 1 - return received_ip[0] - - -def _allocate_host(hostname=None, - addrs=None, - networks=None, - cname_aliases=None, - extattrs={} - ) -> Union[HostAddresses, str]: - """ - If networks is not None, allocate host in those networks. - Otherwise if addrs is not None, allocate host with those addresses. - hostname parameter must be full name including domain name. - Return an error string if couldn't allocate host due to network full. - """ - # TODO: should hostnames be unique - # (i.e. fail if hostname already exists in this domain/service)? - assert addrs or networks, \ - "You must specify either the host addresses or the networks CIDR." - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - if networks: - ipv4_network = networks[0] - ipv6_network = networks[1] - assert _ip_network_version(ipv4_network) == 4 - assert _ip_network_version(ipv6_network) == 6 - - # Find the next available IP address in each network - network_info = _find_networks(network=ipv4_network, ip_version=4) - assert len(network_info) == 1, \ - "IPv4 Network does not exist. Create it first." - assert '_ref' in network_info[0] - ipv4_addr = _find_next_available_ip(infoblox_params, - network_info[0]["_ref"]) - - network_info = _find_networks(network=ipv6_network, ip_version=6) - assert len(network_info) == 1, \ - "IPv6 Network does not exist. Create it first." - assert '_ref' in network_info[0] - ipv6_addr = _find_next_available_ip(infoblox_params, - network_info[0]["_ref"]) - - # If couldn't find next available IPs, return error - if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": - if ipv4_addr == "NETWORK_FULL": - return "IPV4_NETWORK_FULL" - if ipv6_addr == "NETWORK_FULL": - return "IPV6_NETWORK_FULL" - - else: - ipv4_addr = addrs[0] - ipv6_addr = addrs[1] - assert _ip_addr_version(ipv4_addr) == 4 - assert _ip_addr_version(ipv6_addr) == 6 - - req_payload = { - "ipv4addrs": [ - { - "ipv4addr": ipv4_addr - } - ], - "ipv6addrs": [ - { - "ipv6addr": ipv6_addr - } - ], - "name": hostname, - "configure_for_dns": True, - "view": "default", - "extattrs": extattrs - } - - r = requests.post( - f'{_wapi(infoblox_params)}/record:host', - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert isinstance(r.json(), str) - assert r.json().startswith("record:host/") - - if cname_aliases: - - cname_req_payload = { - "name": "", - "canonical": hostname, - "view": "default", - "extattrs": extattrs - } - - for alias in cname_aliases: - cname_req_payload["name"] = alias - r = requests.post( - f'{_wapi(infoblox_params)}/record:cname', - json=cname_req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert r.json().startswith("record:cname/") - - return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), - v6=ipaddress.ip_address(ipv6_addr)) - - -def allocate_service_host(hostname=None, - service_type=None, - service_networks: ServiceNetworks = None, - host_addresses: HostAddresses = None, - cname_aliases=None, - extattrs={} - ) -> HostAddresses: - """ - Allocate host record with both IPv4 and IPv6 address, and respective DNS - A and AAAA records. - - If service_networks is provided, and it's in a valid container, - that one is used. - - If service_networks is not provided, and host_addresses is provided, - those specific addresses are used. - - If neither is provided: - - If service has configured containers, new ipv4 and ipv6 networks are - created and those are used. Note that in this case extattrs is for the - hosts and not for the networks. - - If service doesn't have configured containers and has configured - networks instead, the configured networks are used (they are filled up - in order of appearance in the configuration file). - The domain name is taken from the service type and appended to the - specified hostname. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." - oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers - oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers - oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks - oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks - domain_name = getattr(ipam_params, service_type).domain_name - - assert (oss_ipv4_containers and oss_ipv6_containers) \ - or (oss_ipv4_networks and oss_ipv6_networks), \ - "This service is missing either containers or networks configuration." - assert domain_name, "This service is missing domain_name configuration." - - if cname_aliases: - cname_aliases = [alias + domain_name for alias in cname_aliases] - - if not service_networks and not host_addresses: - if oss_ipv4_containers and oss_ipv6_containers: - # This service has configured containers. - # Use them to allocate new networks that can allocate the hosts. - - # IPv4 - ipv4_network = str(allocate_service_ipv4_network( - service_type=service_type).v4) - assert ipv4_network, \ - "No available space for IPv4 networks for this service type." - - # IPv6 - ipv6_network = str(allocate_service_ipv6_network( - service_type=service_type).v6) - assert ipv6_network, \ - "No available space for IPv6 networks for this service type." - - elif oss_ipv4_networks and oss_ipv6_networks: - # This service has configured networks. - # Allocate a host inside an ipv4 and ipv6 network from among them. - ipv4_network = str(oss_ipv4_networks[0]) - ipv6_network = str(oss_ipv6_networks[0]) - - while True: - ipv4_network_index = 0 - ipv6_network_index = 0 - network_tuple = (ipv4_network, ipv6_network) - host = _allocate_host(hostname=hostname+domain_name, - networks=network_tuple, - cname_aliases=cname_aliases, - extattrs=extattrs) - - if "NETWORK_FULL" not in host: - break - elif "IPV4" in host: - ipv4_network_index += 1 - assert ipv4_network_index < len(oss_ipv4_networks), \ - "No available space in any IPv4 network for this service." - ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) - else: # IPV6 in host - ipv6_network_index += 1 - assert ipv6_network_index < len(oss_ipv6_networks), \ - "No available space in any IPv6 network for this service." - ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) - - elif service_networks: - # IPv4 - ipv4_network = service_networks.v4 - if oss_ipv4_containers: - assert any(ipv4_network.subnet_of(oss_ipv4_container) - for oss_ipv4_container in oss_ipv4_containers) - else: - assert ipv4_network in oss_ipv4_networks - - # IPv6 - ipv6_network = service_networks.v6 - if oss_ipv6_containers: - assert any(ipv6_network.subnet_of(oss_ipv6_container) - for oss_ipv6_container in oss_ipv6_containers) - else: - assert ipv6_network in oss_ipv6_networks - - host = _allocate_host( - hostname=hostname+domain_name, - networks=(str(ipv4_network), str(ipv6_network)), - cname_aliases=cname_aliases, - extattrs=extattrs - ) - assert "NETWORK_FULL" not in host - - elif host_addresses: - # IPv4 - ipv4_addr = host_addresses.v4 - if oss_ipv4_containers: - assert any(ipv4_addr in oss_ipv4_container - for oss_ipv4_container in oss_ipv4_containers) - else: - assert any(ipv4_addr in oss_ipv4_network - for oss_ipv4_network in oss_ipv4_networks) - - # IPv6 - ipv6_addr = host_addresses.v6 - if oss_ipv6_containers: - assert any(ipv6_addr in oss_ipv6_container - for oss_ipv6_container in oss_ipv6_containers) - else: - assert any(ipv4_addr in oss_ipv6_network - for oss_ipv6_network in oss_ipv6_networks) - - host = _allocate_host( - hostname=hostname+domain_name, - addrs=(str(ipv4_addr), str(ipv6_addr)), - cname_aliases=cname_aliases, - extattrs=extattrs - ) - assert "NETWORK_FULL" not in host - - return host - - -""" -Below methods are not used for supported outside calls -""" - -''' -def _find_containers(network=None, ip_version=4): - """ - If network is not None, find that container. - Otherwise find all containers. - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = 'networkcontainer' if ip_version == 4 \ - else 'ipv6networkcontainer' - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={'network': network} if network else None, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def _get_network_capacity(network=None): - """ - Get utilization of a IPv4 network in a fraction of 1000. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - assert ip_version == 4, "Utilization is only available for IPv4 networks." - params = { - 'network': network, - '_return_fields': 'network,total_hosts,utilization' - } - - r = requests.get( - f'{_wapi(infoblox_params)}/network', - params=params, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - # Utilization info takes several minutes to converge. - # The IPAM utilization bar in the GUI as well. Why? - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - capacity_info = r.json() - assert len(capacity_info) == 1, "Requested IPv4 network doesn't exist." - assert 'utilization' in capacity_info[0] - utilization = capacity_info[0]['utilization'] - return utilization - - -def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - """ - Delete IPv4 or IPv6 network by CIDR. - """ - # TODO: should we check that there are no hosts in this network before - # deleting? Deleting a network deletes the hosts in it, but not the - # associated DNS records. - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - - network_info = _find_networks(network=network, ip_version=ip_version) - assert len(network_info) == 1, "Network does not exist." - assert '_ref' in network_info[0] - - r = requests.delete( - f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Extract ipv4/ipv6 address from the network reference obtained in the - # response - r_json = r.json() - network_address = ipaddress.ip_network( - r_json.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) - if ip_version == 4: - return V4ServiceNetwork(v4=network_address) - else: - return V6ServiceNetwork(v6=network_address) - - -def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]: - """ - Delete IPv4 or IPv6 host by its address. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_addr_version(addr) - ip_param = 'ipv4addr' if ip_version == 4 else 'ipv6addr' - - # Find host record reference - r = requests.get( - f'{_wapi(infoblox_params)}/record:host', - params={ip_param: addr}, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - host_data = r.json() - assert len(host_data) == 1, "Host does not exist." - assert '_ref' in host_data[0] - host_ref = host_data[0]['_ref'] - - # Delete it - r = requests.delete( - f'{_wapi(infoblox_params)}/{host_ref}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Also find and delete the associated dns a/aaaa record - endpoint = 'record:a' if ip_version == 4 else 'record:aaaa' - - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={ip_param: addr}, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - dns_data = r.json() - assert len(dns_data) == 1, "DNS record does not exist." - assert '_ref' in dns_data[0] - dns_ref = dns_data[0]['_ref'] - - r = requests.delete( - f'{_wapi(infoblox_params)}/{dns_ref}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - if ip_version == 4: - return V4HostAddress(v4=addr) - else: - return V6HostAddress(v6=addr) - - -def _get_network_usage_status(network): - """ - Get status and usage fields of all hosts in the specified ipv4 or ipv6 - network. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - endpoint = 'ipv4address' if ip_version == 4 else 'ipv6address' - - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={ - '_return_fields': 'ip_address,status,usage', - 'network': network}, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() -''' +import ipaddress +import requests +from enum import Enum +from pydantic import BaseSettings +from requests.auth import HTTPBasicAuth +from typing import Union + +from gso import settings + + +class V4ServiceNetwork(BaseSettings): + v4: ipaddress.IPv4Network + + +class V6ServiceNetwork(BaseSettings): + v6: ipaddress.IPv6Network + + +class ServiceNetworks(BaseSettings): + v4: ipaddress.IPv4Network + v6: ipaddress.IPv6Network + + +class V4HostAddress(BaseSettings): + v4: ipaddress.IPv4Address + + +class V6HostAddress(BaseSettings): + v6: ipaddress.IPv6Address + + +class HostAddresses(BaseSettings): + v4: ipaddress.IPv4Address + v6: ipaddress.IPv6Address + + +class IPAMErrors(Enum): + # HTTP error code, match in error message + CONTAINER_FULL = 400, "Can not find requested number of networks" + NETWORK_FULL = 400, \ + "Cannot find 1 available IP address(es) in this network" + EXTATTR_UNKNOWN = 400, "Unknown extensible attribute" + EXTATTR_BADVALUE = 400, "Bad value for extensible attribute" + + +# TODO: remove this! +# lab infoblox cert is not valid for the ipv4 address +# ... disable warnings for now +requests.packages.urllib3.disable_warnings() + + +def _match_error_code(response, error_code): + return response.status_code == error_code.value[0] \ + and error_code.value[1] in response.text + + +def _wapi(infoblox_params: settings.InfoBloxParams): + return (f'https://{infoblox_params.host}' + f'/wapi/{infoblox_params.wapi_version}') + + +def _ip_addr_version(addr): + ip_version = None + ip_addr = ipaddress.ip_address(addr) + if isinstance(ip_addr, ipaddress.IPv4Address): + ip_version = 4 + elif isinstance(ip_addr, ipaddress.IPv6Address): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + + +def _ip_network_version(network): + ip_version = None + ip_network = ipaddress.ip_network(network) + if isinstance(ip_network, ipaddress.IPv4Network): + ip_version = 4 + elif isinstance(ip_network, ipaddress.IPv6Network): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + + +def _assert_host_in_service( + ipv4_addr='', ipv6_addr='', + oss_ipv4_containers=None, oss_ipv6_containers=None, + oss_ipv4_networks=None, oss_ipv6_networks=None +): + # IPv4 + if oss_ipv4_containers: + assert any(ipv4_addr in oss_ipv4_container + for oss_ipv4_container in oss_ipv4_containers), \ + "Host's IPv4 address doesn't belong to service type." + else: + assert any(ipv4_addr in oss_ipv4_network + for oss_ipv4_network in oss_ipv4_networks), \ + "Host's IPv4 address doesn't belong to service type." + + # IPv6 + if oss_ipv6_containers: + assert any(ipv6_addr in oss_ipv6_container + for oss_ipv6_container in oss_ipv6_containers), \ + "Host's IPv6 address doesn't belong to service type." + else: + assert any(ipv6_addr in oss_ipv6_network + for oss_ipv6_network in oss_ipv6_networks), \ + "Host's IPv6 address doesn't belong to service type." + + +def _find_networks(network_container=None, network=None, ip_version=4): + """ + If network_container is not None, find all networks within the specified + container. + Otherwise, if network is not None, find the specified network. + Otherwise find all networks. + A list of all found networks is returned (an HTTP 200 code + may be returned with an empty list.) + """ + assert ip_version in [4, 6] + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + endpoint = 'network' if ip_version == 4 else 'ipv6network' + params = None + if network_container: + params = {'network_container': network_container} + elif network: + params = {'network': network} + r = requests.get( + f'{_wapi(infoblox_params)}/{endpoint}', + params=params, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + + +def _allocate_network( + infoblox_params: settings.InfoBloxParams, + network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], + ip_version=4, + comment="", + extattrs={} +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + assert ip_version in [4, 6] + endpoint = 'network' if ip_version == 4 else 'ipv6network' + ip_container = 'networkcontainer' if ip_version == 4 else \ + 'ipv6networkcontainer' + + assert network_params.containers, \ + "No containers available to allocate networks for this service." \ + "Maybe you want to allocate a host from a network directly?" + + # only return in the response the allocated network, not all available + # TODO: any validation needed for extrattrs wherever it's used? + req_payload = { + "network": { + "_object_function": "next_available_network", + "_parameters": { + "cidr": network_params.mask + }, + "_object": ip_container, + "_object_parameters": { + "network": str(network_params.containers[0]) + }, + "_result_field": "networks", + }, + "comment": comment, + "extattrs": extattrs + } + + container_index = 0 + while True: + r = requests.post( + f'{_wapi(infoblox_params)}/{endpoint}', + params={'_return_fields': 'network'}, + json=req_payload, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + headers={'content-type': "application/json"}, + verify=False + ) + if not _match_error_code(response=r, + error_code=IPAMErrors.CONTAINER_FULL): + break + # Container full: try with next valid container for service (if any) + container_index += 1 + if len(network_params.containers) < (container_index + 1): + break + req_payload["network"]["_object_parameters"]["network"] = \ + str(network_params.containers[container_index]) + + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + assert 'network' in r.json() + allocated_network = r.json()['network'] + if ip_version == 4: + return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) + else: + return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) + + +def allocate_service_ipv4_network(service_type='', comment="", extattrs={} + ) -> V4ServiceNetwork: + """ + Allocate IPv4 network within the container of the specified service type. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + return _allocate_network(ipam_params.INFOBLOX, + getattr(ipam_params, service_type).V4, + 4, + comment, + extattrs) + + +def allocate_service_ipv6_network(service_type='', comment="", extattrs={} + ) -> V6ServiceNetwork: + """ + Allocate IPv6 network within the container of the specified service type. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + return _allocate_network(ipam_params.INFOBLOX, + getattr(ipam_params, service_type).V6, + 6, + comment, + extattrs) + + +def _find_next_available_ip(infoblox_params, network_ref=''): + """ + Find the next available IP address from a network given its ref. + Returns "NETWORK_FULL" if there's no space in the network. + Otherwise returns the next available IP address in the network. + """ + r = requests.post( + f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1', # noqa: E501 + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + + if _match_error_code(response=r, + error_code=IPAMErrors.NETWORK_FULL): + return "NETWORK_FULL" + + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert 'ips' in r.json() + received_ip = r.json()['ips'] + assert len(received_ip) == 1 + return received_ip[0] + + +def _allocate_host(hostname='', + addrs=None, + networks=None, + cname_aliases=[], + dns_view="default", + extattrs={} + ) -> Union[HostAddresses, str]: + """ + If networks is not None, allocate host in those networks. + Otherwise if addrs is not None, allocate host with those addresses. + hostname parameter must be full name including domain name. + Return "IPV4_NETWORK_FULL" or "IPV6_NETWORK_FULL" + if couldn't allocate host due to requested network being full. + Return "IPV4_NETWORK_NOT_FOUND" or "IPV6_NETWORK_NOT_FOUND" + if couldn't allocate host due to requested network not existing. + """ + # TODO: should hostnames be unique + # (i.e. fail if hostname already exists in this domain/service)? + assert addrs or networks, \ + "You must specify either the host addresses or the networks CIDR." + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + if networks: + ipv4_network = networks[0] + ipv6_network = networks[1] + assert _ip_network_version(ipv4_network) == 4 + assert _ip_network_version(ipv6_network) == 6 + + # Find the next available IP address in each network + network_info = _find_networks(network=ipv4_network, ip_version=4) + if len(network_info) != 1: + return "IPV4_NETWORK_NOT_FOUND" + assert '_ref' in network_info[0] + ipv4_addr = _find_next_available_ip(infoblox_params, + network_info[0]["_ref"]) + + network_info = _find_networks(network=ipv6_network, ip_version=6) + if len(network_info) != 1: + return "IPV6_NETWORK_NOT_FOUND" + assert '_ref' in network_info[0] + ipv6_addr = _find_next_available_ip(infoblox_params, + network_info[0]["_ref"]) + + # If couldn't find next available IPs, return error + if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": + if ipv4_addr == "NETWORK_FULL": + return "IPV4_NETWORK_FULL" + if ipv6_addr == "NETWORK_FULL": + return "IPV6_NETWORK_FULL" + + else: + ipv4_addr = addrs[0] + ipv6_addr = addrs[1] + assert _ip_addr_version(ipv4_addr) == 4 + assert _ip_addr_version(ipv6_addr) == 6 + + req_payload = { + "ipv4addrs": [ + { + "ipv4addr": ipv4_addr + } + ], + "ipv6addrs": [ + { + "ipv6addr": ipv6_addr + } + ], + "name": hostname, + "configure_for_dns": True, + "view": dns_view, + "extattrs": extattrs + } + + r = requests.post( + f'{_wapi(infoblox_params)}/record:host', + json=req_payload, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert isinstance(r.json(), str) + assert r.json().startswith("record:host/") + + if cname_aliases: + + cname_req_payload = { + "name": "", + "canonical": hostname, + "view": dns_view, + "extattrs": extattrs + } + + for alias in cname_aliases: + cname_req_payload["name"] = alias + r = requests.post( + f'{_wapi(infoblox_params)}/record:cname', + json=cname_req_payload, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.json().startswith("record:cname/") + + return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), + v6=ipaddress.ip_address(ipv6_addr)) + + +def allocate_service_host(hostname='', + service_type='', + service_networks: ServiceNetworks = None, + host_addresses: HostAddresses = None, + cname_aliases=None, + extattrs={} + ) -> HostAddresses: + """ + Allocate host record with both IPv4 and IPv6 address, and respective DNS + A and AAAA records. + - If service_networks is provided, and it's in a valid container, + that one is used. + - If service_networks is not provided, and host_addresses is provided, + those specific addresses are used. + - If neither is provided: + - If service has configured containers, new ipv4 and ipv6 networks are + created and those are used. Note that in this case extattrs is for the + hosts and not for the networks. + - If service doesn't have configured containers and has configured + networks instead, the configured networks are used (they are filled up + in order of appearance in the configuration file). + The domain name is taken from the service type and appended to the + specified hostname. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + domain_name = getattr(ipam_params, service_type).domain_name + dns_view = getattr(ipam_params, service_type).dns_view + + assert (oss_ipv4_containers and oss_ipv6_containers) \ + or (oss_ipv4_networks and oss_ipv6_networks), \ + "This service is missing either containers or networks configuration." + assert domain_name, "This service is missing domain_name configuration." + assert dns_view, "This service is missing dns_view configuration." + + if cname_aliases: + cname_aliases = [alias + domain_name for alias in cname_aliases] + + if not service_networks and not host_addresses: + if oss_ipv4_containers and oss_ipv6_containers: + # This service has configured containers. + # Use them to allocate new networks that can allocate the hosts. + + # IPv4 + ipv4_network = str(allocate_service_ipv4_network( + service_type=service_type).v4) + assert ipv4_network, \ + "No available space for IPv4 networks for this service type." + + # IPv6 + ipv6_network = str(allocate_service_ipv6_network( + service_type=service_type).v6) + assert ipv6_network, \ + "No available space for IPv6 networks for this service type." + + elif oss_ipv4_networks and oss_ipv6_networks: + # This service has configured networks. + # Allocate a host inside an ipv4 and ipv6 network from among them. + ipv4_network = str(oss_ipv4_networks[0]) + ipv6_network = str(oss_ipv6_networks[0]) + + ipv4_network_index = 0 + ipv6_network_index = 0 + while True: + network_tuple = (ipv4_network, ipv6_network) + host = _allocate_host(hostname=hostname+domain_name, + networks=network_tuple, + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs) + + if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: + break + elif "IPV4" in host: + ipv4_network_index += 1 + assert oss_ipv4_networks, \ + "No available space in any IPv4 network for this service." + assert ipv4_network_index < len(oss_ipv4_networks), \ + "No available space in any IPv4 network for this service." + ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) + else: # "IPV6" in host + ipv6_network_index += 1 + assert oss_ipv6_networks, \ + "No available space in any IPv6 network for this service." + assert ipv6_network_index < len(oss_ipv6_networks), \ + "No available space in any IPv6 network for this service." + ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) + + elif service_networks: + # IPv4 + ipv4_network = service_networks.v4 + if oss_ipv4_containers: + assert any(ipv4_network.subnet_of(oss_ipv4_container) + for oss_ipv4_container in oss_ipv4_containers) + else: + assert ipv4_network in oss_ipv4_networks + + # IPv6 + ipv6_network = service_networks.v6 + if oss_ipv6_containers: + assert any(ipv6_network.subnet_of(oss_ipv6_container) + for oss_ipv6_container in oss_ipv6_containers) + else: + assert ipv6_network in oss_ipv6_networks + + host = _allocate_host( + hostname=hostname+domain_name, + networks=(str(ipv4_network), str(ipv6_network)), + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs + ) + assert "NETWORK_FULL" not in host, \ + "Network is full." + assert "NETWORK_NOT_FOUND" not in host, \ + "Network does not exist. Create it first." + + elif host_addresses: + ipv4_addr = host_addresses.v4 + ipv6_addr = host_addresses.v6 + _assert_host_in_service( + ipv4_addr, ipv6_addr, + oss_ipv4_containers, oss_ipv6_containers, + oss_ipv4_networks, oss_ipv6_networks + ) + + host = _allocate_host( + hostname=hostname+domain_name, + addrs=(str(ipv4_addr), str(ipv6_addr)), + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs + ) + assert "NETWORK_FULL" not in host + + return host + + +def delete_service_network(ipnetwork=None, service_type='' + ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + """ + Delete IPv4 or IPv6 network by CIDR. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + + network = str(ipnetwork) + ip_version = _ip_network_version(network) + + # Ensure that the network to be deleted is under the service type. + # Otherwise user is not allowed to delete it + if ip_version == 4: + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + if oss_ipv4_containers: + assert any(ipnetwork.subnet_of(oss_ipv4_container) + for oss_ipv4_container in oss_ipv4_containers), \ + "Can't delete: network doesn't belong to service type." + else: + assert ipnetwork in oss_ipv4_networks, \ + "Can't delete: network doesn't belong to service type." + + else: + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + if oss_ipv6_containers: + assert any(ipnetwork.subnet_of(oss_ipv6_container) + for oss_ipv6_container in oss_ipv6_containers), \ + "Can't delete: network doesn't belong to service type." + else: + assert ipnetwork in oss_ipv6_networks, \ + "Can't delete: network doesn't belong to service type." + + network_info = _find_networks(network=network, ip_version=ip_version) + assert len(network_info) == 1, "Network does not exist." + assert '_ref' in network_info[0] + + r = requests.delete( + f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Extract ipv4/ipv6 address from the network reference obtained in the + # response + r_text = r.text + print(r_text) + network_address = ipaddress.ip_network( + r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) + if ip_version == 4: + return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) + else: + return V6ServiceNetwork(v6=ipaddress.ip_network(network_address)) + + +def delete_service_host( + hostname='', + host_addresses: HostAddresses = None, + cname_aliases=[], + service_type='' +) -> Union[V4HostAddress, V6HostAddress]: + """ + Delete host record and associated CNAME records. + All arguments passed to this function must match together a host record in + IPAM, and all CNAME records associated to it must also be passed exactly. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert ipam_params.INFOBLOX + infoblox_params = ipam_params.INFOBLOX + + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + domain_name = getattr(ipam_params, service_type).domain_name + dns_view = getattr(ipam_params, service_type).dns_view + ipv4_addr = str(host_addresses.v4) + ipv6_addr = str(host_addresses.v6) + + _assert_host_in_service( + host_addresses.v4, host_addresses.v6, + oss_ipv4_containers, oss_ipv6_containers, + oss_ipv4_networks, oss_ipv6_networks + ) + + # Find host record reference + r = requests.get( + f'{_wapi(infoblox_params)}/record:host', + params={ + 'name': (hostname+domain_name).lower(), # hostnames are lowercase + 'ipv4addr': ipv4_addr, + 'ipv6addr': ipv6_addr, + 'view': dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + host_data = r.json() + assert len(host_data) == 1, "Host does not exist." + assert '_ref' in host_data[0] + host_ref = host_data[0]['_ref'] + + # Find cname records reference + r = requests.get( + f'{_wapi(infoblox_params)}/record:cname', + params={ + 'canonical': hostname+domain_name, + "view": dns_view, + }, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + cname_data = r.json() + provided_cnames = [item + domain_name for item in cname_aliases] + found_cnames = [item['name'] for item in cname_data if 'name' in item] + assert provided_cnames == found_cnames, \ + "Provided CNAME alias names don't match the ones poiting to hostname." + + # Delete the host record + r = requests.delete( + f'{_wapi(infoblox_params)}/{host_ref}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Delete the CNAME records + cname_refs = [item['_ref'] for item in cname_data if 'name' in item] + for cname_ref in cname_refs: + r = requests.delete( + f'{_wapi(infoblox_params)}/{cname_ref}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + return host_addresses + + +""" +Below methods are not used for supported outside calls +""" + +''' +def _find_containers(network=None, ip_version=4): + """ + If network is not None, find that container. + Otherwise find all containers. + """ + assert ip_version in [4, 6] + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + endpoint = 'networkcontainer' if ip_version == 4 \ + else 'ipv6networkcontainer' + r = requests.get( + f'{_wapi(infoblox_params)}/{endpoint}', + params={'network': network} if network else None, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + + +def _get_network_capacity(network=None): + """ + Get utilization of a IPv4 network in a fraction of 1000. + """ + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + ip_version = _ip_network_version(network) + assert ip_version == 4, "Utilization is only available for IPv4 networks." + params = { + 'network': network, + '_return_fields': 'network,total_hosts,utilization' + } + + r = requests.get( + f'{_wapi(infoblox_params)}/network', + params=params, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + # Utilization info takes several minutes to converge. + # The IPAM utilization bar in the GUI as well. Why? + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + capacity_info = r.json() + assert len(capacity_info) == 1, "Requested IPv4 network doesn't exist." + assert 'utilization' in capacity_info[0] + utilization = capacity_info[0]['utilization'] + return utilization + + +def _get_network_usage_status(network): + """ + Get status and usage fields of all hosts in the specified ipv4 or ipv6 + network. + """ + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + ip_version = _ip_network_version(network) + endpoint = 'ipv4address' if ip_version == 4 else 'ipv6address' + + r = requests.get( + f'{_wapi(infoblox_params)}/{endpoint}', + params={ + '_return_fields': 'ip_address,status,usage', + 'network': network}, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() +''' diff --git a/gso/services/ipam.py b/gso/services/ipam.py index 96c957d07883ac644fa202e69614e5bb6049dacf..724db9019076031eda23e9bc4faa5de950531d3a 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -1,5 +1,6 @@ import ipaddress from pydantic import BaseSettings +from typing import Union from gso.services import _ipam @@ -30,7 +31,7 @@ class HostAddresses(BaseSettings): v6: ipaddress.IPv6Address -def new_service_networks(service_type, +def new_service_networks(service_type='', comment="", extattrs={}) -> ServiceNetworks: v4_service_network = _ipam.allocate_service_ipv4_network( @@ -43,7 +44,7 @@ def new_service_networks(service_type, def new_service_host(hostname, - service_type, + service_type='', service_networks: ServiceNetworks = None, host_addresses: HostAddresses = None, cname_aliases=None, @@ -57,53 +58,24 @@ def new_service_host(hostname, extattrs=extattrs) -if __name__ == '__main__': - # sample call flow to allocate two loopback interfaces and a trunk service - # new_service_host can be called passing networks, addresses, or nothing. - # - host h1 for service TRUNK uses a specific ipv4/ipv6 address pair - # - host h2 for service TRUNK uses the ipv4/ipv6 network pair - # - service LO uses nothing - # networks and hosts can be allocated with extensible attributes - # networks can be created with a comment - # CNAME records can be optionally created - - hostname_A = 'hA' - hostname_B = 'hB' - - # h1 LO (loopback) - new_service_host(hostname=hostname_A+"_LO", - cname_aliases=["alias1.hA", "alias2.hA"], - service_type='LO') - - # h2 LO (loopback) - new_service_host(hostname=hostname_B+"_LO", - cname_aliases=["alias1.hB"], - service_type='LO') - - # h1-h2 TRUNK - trunk12_network_extattrs = { - "vrf_name": {"value": "dummy_vrf"}, - } - trunk12_host_extattrs = { - "Site": {"value": "dummy_site"}, - } - trunk12_service_networks = new_service_networks( - service_type='TRUNK', - extattrs=trunk12_network_extattrs, - comment="Network for h1-h2 TRUNK" +def delete_service_network( + network: ipaddress.ip_network = None, service_type='' +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + return _ipam.delete_service_network( + ipnetwork=network, + service_type=service_type ) - trunk12_v4_host_address = trunk12_service_networks.v4.network_address - trunk12_v6_host_address = trunk12_service_networks.v6.network_address - trunk12_host_addresses = HostAddresses(v4=trunk12_v4_host_address, - v6=trunk12_v6_host_address) - new_service_host(hostname=hostname_A+"_TRUNK", - service_type='TRUNK', - host_addresses=trunk12_host_addresses, - extattrs=trunk12_host_extattrs) - - new_service_host(hostname=hostname_B+"_TRUNK", - service_type='TRUNK', - service_networks=trunk12_service_networks, - extattrs=trunk12_host_extattrs) +def delete_service_host( + hostname='', + host_addresses: HostAddresses = None, + cname_aliases=[], + service_type='' +) -> HostAddresses: + return _ipam.delete_service_host( + hostname=hostname, + host_addresses=host_addresses, + cname_aliases=cname_aliases, + service_type=service_type + ) diff --git a/gso/settings.py b/gso/settings.py index 016b571b95218218cad8039445042d967965e85d..a0be09616096e5916526878f0875a27108eb8fc3 100644 --- a/gso/settings.py +++ b/gso/settings.py @@ -33,7 +33,7 @@ class V4NetworkParams(BaseSettings): """ containers: list[ipaddress.IPv4Network] networks: list[ipaddress.IPv4Network] - mask: int = Field(None, ge=0, le=32) + mask: int # TODO: validation on mask? class V6NetworkParams(BaseSettings): @@ -42,7 +42,7 @@ class V6NetworkParams(BaseSettings): """ containers: list[ipaddress.IPv6Network] networks: list[ipaddress.IPv6Network] - mask: int = Field(None, ge=0, le=128) + mask: int # TODO: validation on mask? class ServiceNetworkParams(BaseSettings): @@ -53,6 +53,7 @@ class ServiceNetworkParams(BaseSettings): V4: V4NetworkParams V6: V6NetworkParams domain_name: str + dns_view: str class IPAMParams(BaseSettings): diff --git a/gso/workflows/device/create_device.py b/gso/workflows/device/create_device.py index f7fcaab91f1486328dcaf7ccdfeb5232d06ed66d..62d319586de87a52fdb4026ecd5a43ab07c0ac19 100644 --- a/gso/workflows/device/create_device.py +++ b/gso/workflows/device/create_device.py @@ -19,7 +19,6 @@ from gso.products.product_types import device from gso.products.product_types.device import DeviceInactive, \ DeviceProvisioning from gso.products.product_types.site import Site -# noinspection PyProtectedMember from gso.services import _ipam from gso.services import provisioning_proxy from gso.services.provisioning_proxy import await_pp_results, \ @@ -97,14 +96,11 @@ def get_info_from_ipam(subscription: DeviceProvisioning) -> State: subscription.device.device_lo_iso_address \ = iso_from_ipv4(str(subscription.device.device_lo_ipv4_address)) subscription.device.device_si_ipv4_network \ - = _ipam.allocate_service_ipv4_network(service_type='SI', - comment=f'SI for {lo0_name}').v4 + = _ipam.allocate_service_ipv4_network(service_type='SI', comment=f"SI for {lo0_name}").v4 subscription.device.device_ias_lt_ipv4_network \ - = _ipam.allocate_service_ipv4_network(service_type='LT_IAS', - comment=f'LT for {lo0_name}').v4 + = _ipam.allocate_service_ipv4_network(service_type='LT_IAS', comment=f"LT for {lo0_name}").v4 subscription.device.device_ias_lt_ipv6_network \ - = _ipam.allocate_service_ipv6_network(service_type='LT_IAS', - comment=f'LT for {lo0_name}').v6 + = _ipam.allocate_service_ipv6_network(service_type='LT_IAS', comment=f"LT for {lo0_name}").v6 return {'subscription': subscription} diff --git a/gso/workflows/iptrunk/create_iptrunk.py b/gso/workflows/iptrunk/create_iptrunk.py index 326793b9bad2f19651062c4a059b30dce002fca1..e90d42841a92bbeadd49956c25c99fbbcaee6dcf 100644 --- a/gso/workflows/iptrunk/create_iptrunk.py +++ b/gso/workflows/iptrunk/create_iptrunk.py @@ -17,7 +17,6 @@ from gso.products.product_blocks.iptrunk import IptrunkType from gso.products.product_types.device import Device from gso.products.product_types.iptrunk import IptrunkInactive, \ IptrunkProvisioning -# noinspection PyProtectedMember from gso.services import provisioning_proxy, _ipam from gso.services.provisioning_proxy import confirm_pp_results, \ await_pp_results @@ -106,16 +105,11 @@ def create_subscription(product: UUIDstr) -> State: @step('Get information from IPAM') def get_info_from_ipam(subscription: IptrunkProvisioning) -> State: + # TODO: get info about how these should be generated subscription.iptrunk.iptrunk_ipv4_network \ - = _ipam.allocate_service_ipv4_network( - service_type='TRUNK', - comment=subscription.iptrunk.iptrunk_description - ).v4 + = _ipam.allocate_service_ipv4_network(service_type="TRUNK", comment=subscription.iptrunk.iptrunk_description).v4 subscription.iptrunk.iptrunk_ipv6_network \ - = _ipam.allocate_service_ipv6_network( - service_type='TRUNK', - comment=subscription.iptrunk.iptrunk_description - ).v6 + = _ipam.allocate_service_ipv6_network(service_type="TRUNK", comment=subscription.iptrunk.iptrunk_description).v6 return {'subscription': subscription} diff --git a/test/conftest.py b/test/conftest.py index 0edc5f13f9a40f199fe3e2d696b16d4d17b0c603..04952145b61bd374ca02dc44abb1c00c522f9ac8 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -27,11 +27,18 @@ def configuration_data(): "password": "robot-user-password" }, "LO": { - "V4": {"containers": ["10.255.255.0/24"], "networks": [], - "mask": 32}, - "V6": {"containers": ["dead:beef::/64"], "networks": [], - "mask": 128}, - "domain_name": ".lo" + "V4": { + "containers": [], + "networks": ["10.255.255.0/26"], + "mask": 32 + }, + "V6": { + "containers": [], + "networks": ["dead:beef::/80"], + "mask": 128 + }, + "domain_name": ".lo", + "dns_view": "default" }, "TRUNK": { "V4": { @@ -44,7 +51,8 @@ def configuration_data(): "networks": [], "mask": 126 }, - "domain_name": ".trunk" + "domain_name": ".trunk", + "dns_view": "default" }, "GEANT_IP": { "V4": { @@ -57,7 +65,8 @@ def configuration_data(): "networks": [], "mask": 126 }, - "domain_name": ".geantip" + "domain_name": ".geantip", + "dns_view": "default" }, "SI": { "V4": { @@ -70,7 +79,8 @@ def configuration_data(): "networks": [], "mask": 126 }, - "domain_name": ".geantip" + "domain_name": ".geantip", + "dns_view": "default" }, "LT_IAS": { "V4": { @@ -83,7 +93,8 @@ def configuration_data(): "networks": [], "mask": 126 }, - "domain_name": ".geantip" + "domain_name": ".geantip", + "dns_view": "default" } }, "PROVISIONING_PROXY": { diff --git a/test/test_ipam.py b/test/test_ipam.py index 23c1aec2ce6598a64d34e65f6cf421839af85629..e3191bcc3daf920beceaed175d9884f7b553ba7d 100644 --- a/test/test_ipam.py +++ b/test/test_ipam.py @@ -1,4 +1,5 @@ import ipaddress +import pytest import re import responses @@ -12,8 +13,7 @@ def test_new_service_networks(data_config_filename): method=responses.POST, url=re.compile(r'.*/wapi.*/network.*'), json={ - '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.2' - '55.20/32/default', # noqa: E501 + '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501 'network': '10.255.255.20/32' } ) @@ -22,88 +22,126 @@ def test_new_service_networks(data_config_filename): method=responses.POST, url=re.compile(r'.*/wapi.*/ipv6network.*'), json={ - '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:de' - 'ad%3Abeef%3A%3A18/128/default', # noqa: E501 + '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501 'network': 'dead:beef::18/128' } ) - service_networks = ipam.new_service_networks(service_type='LO') + service_networks = ipam.new_service_networks(service_type='TRUNK') assert service_networks == ipam.ServiceNetworks( v4=ipaddress.ip_network('10.255.255.20/32'), v6=ipaddress.ip_network('dead:beef::18/128') ) + # should fail because this service type has networks instead of containers + with pytest.raises(AssertionError): + service_networks = ipam.new_service_networks(service_type='LO') + assert service_networks is None + @responses.activate def test_new_service_host(data_config_filename): responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:host$'), - json='record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4' - 'MzY3MC5nc28udGVzdA:test.lo/%20' # noqa: E501 + json='record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20' # noqa: E501 ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:a$'), - json='record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS4' - '4:test.lo/default' # noqa: E501 + json='record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501 ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:aaaa$'), - json='record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1' - 'NS44:test.lo/default' # noqa: E501 + json='record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501 ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/network.*'), + url=re.compile(r'.*/wapi.*/network.*10.255.255.*'), json=[ { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.2" - "55.255.20/32/default", # noqa: E501 + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 "network": "10.255.255.20/32", "network_view": "default" } ] + ) + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/network.*10.255.254.*'), + json=[ + { + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501 + "network": "10.255.254.20/32", + "network_view": "default" + } + ] ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/ipv6network.*'), + url=re.compile(r'.*/wapi.*/ipv6network.*dead.*beef.*'), json=[ { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvM" - "A:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 "network": "dead:beef::18/128", "network_view": "default" } ] ) + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/ipv6network.*beef.*dead.*'), + json=[] + ) + responses.add( method=responses.POST, - url=re.compile( - r'.*/wapi.*/network.*/.*?_function=next_available_ip&num=1.*'), - # noqa: E501 + url=re.compile(r'.*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$'), # noqa: E501 json={'ips': ['10.255.255.20']} ) responses.add( method=responses.POST, - url=re.compile( - r'.*/wapi.*/ipv6network.*/.*?_function=next_available_ip&num=1.*'), - # noqa: E501 + url=re.compile(r'.*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$'), # noqa: E501 + body="Cannot find 1 available IP address(es) in this network", + status=400 + ) + + responses.add( + method=responses.POST, + url=re.compile(r'.*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$'), # noqa: E501 json={'ips': ['dead:beef::18']} ) + responses.add( + method=responses.POST, + url=re.compile(r'.*/wapi.*/network.*_return_fields.*'), + json={ + '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501 + 'network': '10.255.255.20/32' + } + ) + + responses.add( + method=responses.POST, + url=re.compile(r'.*/wapi.*/ipv6network.*_return_fields.*'), + json={ + '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501 + 'network': 'dead:beef::18/128' + } + ) + + # test host creation by IP addresses service_hosts = ipam.new_service_host( hostname='test', - service_type='LO', + service_type='TRUNK', host_addresses=ipam.HostAddresses( v4=ipaddress.ip_address('10.255.255.20'), v6=ipaddress.ip_address('dead:beef::18') @@ -114,9 +152,10 @@ def test_new_service_host(data_config_filename): v6=ipaddress.ip_address('dead:beef::18') ) + # test host creation by network addresses service_hosts = ipam.new_service_host( hostname='test', - service_type='LO', + service_type='TRUNK', service_networks=ipam.ServiceNetworks( v4=ipaddress.ip_network('10.255.255.20/32'), v6=ipaddress.ip_network('dead:beef::18/128') @@ -126,3 +165,245 @@ def test_new_service_host(data_config_filename): v4=ipaddress.ip_address('10.255.255.20'), v6=ipaddress.ip_address('dead:beef::18') ) + + # test host creation by just service_type when service cfg uses networks + service_hosts = ipam.new_service_host( + hostname='test', + service_type='LO' + ) + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address('10.255.255.20'), + v6=ipaddress.ip_address('dead:beef::18') + ) + + # test host creation by just service_type when service cfg uses containers + service_hosts = ipam.new_service_host( + hostname='test', + service_type='TRUNK' + ) + assert service_hosts == ipam.HostAddresses( + v4=ipaddress.ip_address('10.255.255.20'), + v6=ipaddress.ip_address('dead:beef::18') + ) + + # test host creation that should return a no available IP error + with pytest.raises(AssertionError): + service_hosts = ipam.new_service_host( + hostname='test', + service_type='TRUNK', + service_networks=ipam.ServiceNetworks( + v4=ipaddress.ip_network('10.255.254.20/32'), + v6=ipaddress.ip_network('dead:beef::18/128') + ) + ) + assert service_hosts is None + + # test host creation that should return a network not exist error + with pytest.raises(AssertionError): + service_hosts = ipam.new_service_host( + hostname='test', + service_type='TRUNK', + service_networks=ipam.ServiceNetworks( + v4=ipaddress.ip_network('10.255.255.20/32'), + v6=ipaddress.ip_network('beef:dead::18/128') + ) + ) + assert service_hosts is None + + +@responses.activate +def test_delete_service_network(data_config_filename): + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/network.*10.255.255.0.*'), + json=[ + { + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + "network": "10.255.255.0/26", + "network_view": "default" + } + ] + ) + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/network.*10.255.255.20.*'), + json=[ + { + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 + "network": "100.255.255.20/32", + "network_view": "default" + } + ] + ) + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/ipv6network.*dead.*beef.*'), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + "network_view": "default" + } + ] + ) + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*/ipv6network.*beef.*dead.*'), + json=[ + { + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 + "network": "beef:dead::18/128", + "network_view": "default" + } + ] + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*/network.*10.255.255.0.*'), + body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default" # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*/network.*100.255.255.*'), + body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default" # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*/ipv6network.*dead.*beef.*'), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default" # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*/ipv6network.*beef.*dead.*'), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default" # noqa: E501 + ) + + service_network = ipam.delete_service_network( + network=ipaddress.ip_network('10.255.255.0/26'), service_type='LO' + ) + assert service_network == ipam.V4ServiceNetwork( + v4=ipaddress.ip_network('10.255.255.0/26') + ) + + with pytest.raises(AssertionError): + service_network = ipam.delete_service_network( + network=ipaddress.ip_network('10.255.255.20/32'), + service_type='LO' + ) + assert service_network is None + + service_network = ipam.delete_service_network( + network=ipaddress.ip_network('dead:beef::18/128'), + service_type='TRUNK' + ) + assert service_network == ipam.V6ServiceNetwork( + v6=ipaddress.ip_network('dead:beef::18/128') + ) + + with pytest.raises(AssertionError): + service_network = ipam.delete_service_network( + network=ipaddress.ip_network('beef:dead::18/128'), + service_type='TRUNK' + ) + assert service_network is None + + +@responses.activate +def test_delete_service_host(data_config_filename): + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*record:host.*'), + json=[ + { + '_ref': 'record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default', # noqa: E501 + 'ipv4addrs': [ + { + '_ref': 'record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default', # noqa: E501 + 'configure_for_dhcp': False, + 'host': 'ha_lo.gso', 'ipv4addr': '10.255.255.1' + } + ], + 'ipv6addrs': [ + { + '_ref': 'record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default', # noqa: E501 + 'configure_for_dhcp': False, + 'host': 'ha_lo.gso', 'ipv6addr': 'dead:beef::1' + } + ], + 'name': 'ha_lo.gso', 'view': 'default' + } + ] + ) + + responses.add( + method=responses.GET, + url=re.compile(r'.*/wapi.*record:cname.*'), + json=[ + { + '_ref': 'record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default', # noqa: E501 + 'canonical': 'hA_LO.lo', 'name': 'alias1.ha.lo', + 'view': 'default' + }, + { + '_ref': 'record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default', # noqa: E501 + 'canonical': 'hA_LO.lo', 'name': 'alias2.ha.lo', + 'view': 'default' + } + ] + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*record:host.*'), + body='record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default' # noqa: E501 + ) + + responses.add( + method=responses.DELETE, + url=re.compile(r'.*/wapi.*record:cname.*'), + body='record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default' # noqa: E501 + ) + + input_host_addresses = ipam.HostAddresses( + v4=ipaddress.ip_address('10.255.255.1'), + v6=ipaddress.ip_address('dead:beef::1') + ) + host_addresses = ipam.delete_service_host( + hostname='ha_lo', + host_addresses=input_host_addresses, + cname_aliases=['alias1.ha', 'alias2.ha'], + service_type='LO' + ) + assert host_addresses == ipam.HostAddresses( + v4=ipaddress.ip_address('10.255.255.1'), + v6=ipaddress.ip_address('dead:beef::1') + ) + + # Fail because missing CNAME + with pytest.raises(AssertionError): + host_addresses = ipam.delete_service_host( + hostname='ha_lo', + host_addresses=input_host_addresses, + cname_aliases=['alias1.ha'], + service_type='LO' + ) + assert host_addresses is None + + # Fail because non-matching CNAME + with pytest.raises(AssertionError): + host_addresses = ipam.delete_service_host( + hostname='ha_lo', + host_addresses=input_host_addresses, + cname_aliases=['alias1.ha', 'alias2.ha', 'alias3.ha'], + service_type='LO' + ) + assert host_addresses is None