diff --git a/gso/oss-params-example.json b/gso/oss-params-example.json index afc41967d1dbe3eb333eca86567f4546db71dcdd..4f4bca6c8c29ea38ef67272251fe21409685296b 100644 --- a/gso/oss-params-example.json +++ b/gso/oss-params-example.json @@ -12,18 +12,18 @@ "password": "robot-user-password" }, "LO": { - "V4": {"containers": ["1.1.0.0/24"], "mask": 32}, - "V6": {"containers": ["dead:beef::/64"], "mask": 128}, + "V4": {"containers": ["1.1.0.0/24"], "networks": [], "mask": 32}, + "V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 128}, "domain_name": ".lo" }, "TRUNK": { - "V4": {"containers": ["1.1.1.0/24"], "mask": 31}, - "V6": {"containers": ["dead:beef::/64"], "mask": 126}, + "V4": {"containers": ["1.1.1.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126}, "domain_name": ".trunk" }, "GEANT_IP": { - "V4": {"containers": ["1.1.2.0/24"], "mask": 31}, - "V6": {"containers": ["dead:beef::/64"], "mask": 126}, + "V4": {"containers": ["1.1.2.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64"], "networks": [], "mask": 126}, "domain_name": ".geantip" } }, diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index 2707a53ddc0ef4c2111baca5b8ac640f80a9b614..42bc3d7004acd687c95798e86c5cb02b5c8187df 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -1,681 +1,678 @@ -import ipaddress -import requests -from enum import Enum -from pydantic import BaseSettings -from requests.auth import HTTPBasicAuth -from typing import Union - -from gso import settings - - -class V4ServiceNetwork(BaseSettings): - v4: ipaddress.IPv4Network - - -class V6ServiceNetwork(BaseSettings): - v6: ipaddress.IPv6Network - - -class ServiceNetworks(BaseSettings): - v4: ipaddress.IPv4Network - v6: ipaddress.IPv6Network - - -class V4HostAddress(BaseSettings): - v4: ipaddress.IPv4Address - - -class V6HostAddress(BaseSettings): - v6: ipaddress.IPv6Address - - -class HostAddresses(BaseSettings): - v4: ipaddress.IPv4Address - v6: ipaddress.IPv6Address - - -class IPAMErrors(Enum): - # HTTP error code, match in error message - CONTAINER_FULL = 400, "Can not find requested number of networks" - EXTATTR_UNKNOWN = 400, "Unknown extensible attribute" - EXTATTR_BADVALUE = 400, "Bad value for extensible attribute" - - -# TODO: remove this! -# lab infoblox cert is not valid for the ipv4 address -# ... disable warnings for now -requests.packages.urllib3.disable_warnings() - - -def _match_error_code(response, error_code): - return response.status_code == error_code.value[0] \ - and error_code.value[1] in response.text - - -def _wapi(infoblox_params: settings.InfoBloxParams): - return (f'https://{infoblox_params.host}' - f'/wapi/{infoblox_params.wapi_version}') - - -def _ip_addr_version(addr): - ip_version = None - ip_addr = ipaddress.ip_address(addr) - if isinstance(ip_addr, ipaddress.IPv4Address): - ip_version = 4 - elif isinstance(ip_addr, ipaddress.IPv6Address): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def _ip_network_version(network): - ip_version = None - ip_network = ipaddress.ip_network(network) - if isinstance(ip_network, ipaddress.IPv4Network): - ip_version = 4 - elif isinstance(ip_network, ipaddress.IPv6Network): - ip_version = 6 - assert ip_version in [4, 6] - return ip_version - - -def _find_networks(network_container=None, network=None, ip_version=4): - """ - If network_container is not None, find all networks within the specified - container. - Otherwise, if network is not None, find the specified network. - Otherwise find all networks. - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = 'network' if ip_version == 4 else 'ipv6network' - params = None - if network_container: - params = {'network_container': network_container} - elif network: - params = {'network': network} - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params=params, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - # TODO: propagate "network not found" error to caller - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def _get_network_capacity(network=None): - """ - Get utilization of a IPv4 network in a fraction of 1000. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - assert ip_version == 4, "Utilization is only available for IPv4 networks." - params = { - 'network': network, - '_return_fields': 'network,total_hosts,utilization' - } - - r = requests.get( - f'{_wapi(infoblox_params)}/network', - params=params, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - # Utilization info takes several minutes to converge. - # The IPAM utilization bar in the GUI as well. Why? - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - capacity_info = r.json() - assert len(capacity_info) == 1, "Requested IPv4 network doesn't exist." - assert 'utilization' in capacity_info[0] - utilization = capacity_info[0]['utilization'] - return utilization - - -def _allocate_network( - infoblox_params: settings.InfoBloxParams, - network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], - ip_version=4, - comment="", - extattrs={} -) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - assert ip_version in [4, 6] - endpoint = 'network' if ip_version == 4 else 'ipv6network' - ip_container = 'networkcontainer' if ip_version == 4 else \ - 'ipv6networkcontainer' - - # only return in the response the allocated network, not all available - # TODO: any validation needed for extrattrs wherever it's used? - req_payload = { - "network": { - "_object_function": "next_available_network", - "_parameters": { - "cidr": network_params.mask - }, - "_object": ip_container, - "_object_parameters": { - "network": str(network_params.containers[0]) - }, - "_result_field": "networks", - }, - "comment": comment, - "extattrs": extattrs - } - - container_index = 0 - while True: - r = requests.post( - f'{_wapi(infoblox_params)}/{endpoint}', - params={'_return_fields': 'network'}, - json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - headers={'content-type': "application/json"}, - verify=False - ) - if not _match_error_code(response=r, - error_code=IPAMErrors.CONTAINER_FULL): - break - # Container full: try with next valid container for service (if any) - container_index += 1 - if len(network_params.containers) < (container_index + 1): - break - req_payload["network"]["_object_parameters"]["network"] = \ - str(network_params.containers[container_index]) - - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - assert 'network' in r.json() - allocated_network = r.json()['network'] - if ip_version == 4: - return V4ServiceNetwork(v4=allocated_network) - else: - return V6ServiceNetwork(v6=allocated_network) - - -def allocate_service_ipv4_network(service_type, comment="", extattrs={} - ) -> V4ServiceNetwork: - """ - Allocate IPv4 network within the container of the specified service type. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, - getattr(ipam_params, service_type).V4, - 4, - comment, - extattrs) - - -def allocate_service_ipv6_network(service_type, comment="", extattrs={} - ) -> V6ServiceNetwork: - """ - Allocate IPv6 network within the container of the specified service type. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, - getattr(ipam_params, service_type).V6, - 6, - comment, - extattrs) - - -def _find_next_available_ip(infoblox_params, network_ref): - r = requests.post( - f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1', # noqa: E501 - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - # TODO: propagate no more available IPs in the network - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert 'ips' in r.json() - received_ip = r.json()['ips'] - assert len(received_ip) == 1 - return received_ip[0] - - -def _allocate_host(hostname=None, addr=None, network=None, extattrs={} - ) -> Union[V4HostAddress, V6HostAddress]: - """ - If network is not None, allocate host in that network. - Otherwise if addr is not None, allocate host with that address. - hostname parameter must be full name including domain name. - """ - # TODO: should hostnames be unique - # (i.e. fail if hostname already exists in this domain/service)? - assert addr or network, \ - "You must specify either the host address or the network CIDR." - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - if network: - ip_version = _ip_network_version(network) - # Find the next available IP address in the network - network_info = _find_networks(network=network, ip_version=ip_version) - assert len(network_info) == 1, \ - "Network does not exist. Create it first." - assert '_ref' in network_info[0] - addr = _find_next_available_ip(infoblox_params, - network_info[0]["_ref"]) - - else: - ip_version = _ip_addr_version(addr) - - ip_req_payload = { - f"ipv{ip_version}addrs": [ - { - f"ipv{ip_version}addr": addr - } - ], - "name": hostname, - "configure_for_dns": False, - "view": "default", - "extattrs": extattrs - } - - r = requests.post( - f'{_wapi(infoblox_params)}/record:host', - json=ip_req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert isinstance(r.json(), str) - assert r.json().startswith("record:host/") - - dns_req_payload = { - f"ipv{ip_version}addr": addr, - "name": hostname, - "view": "default", - "extattrs": extattrs - } - - endpoint = 'record:a' if ip_version == 4 else 'record:aaaa' - - r = requests.post( - f'{_wapi(infoblox_params)}/{endpoint}', - json=dns_req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert isinstance(r.json(), str) - assert r.json().startswith(f"{endpoint}/") - - if ip_version == 4: - return V4HostAddress(v4=addr) - else: - return V6HostAddress(v6=addr) - - -def allocate_service_host(hostname=None, - service_type=None, - service_networks: ServiceNetworks = None, - host_addresses: HostAddresses = None, - extattrs={} - ) -> HostAddresses: - """ - Allocate host with both IPv4 and IPv6 address (and respective DNS - records). - The domain name is also taken from the service type and appended to - specified hostname. - If service_networks is provided, that one is used. - If service_networks is not provided, and host_addresses is provided, - those specific addresses are used. - If neither is not provided, the first network with available space for - this service type is used. - Note that if WFO will always specify the network/addresses after - creating it, this mode won't be needed. Currently this mode doesn't - look further than the first container, so if needed, this will need - to be updated. - """ - oss = settings.load_oss_params() - assert oss.IPAM - ipam_params = oss.IPAM - - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." - ipv4_containers = getattr(ipam_params, service_type).V4.containers - ipv6_containers = getattr(ipam_params, service_type).V6.containers - domain_name = getattr(ipam_params, service_type).domain_name - - # IPv4 - if not service_networks and not host_addresses: - ipv4_networks_info = _find_networks( - network_container=str(ipv4_containers[0]), ip_version=4) - assert len(ipv4_networks_info) >= 1, \ - "No IPv4 network exists in the container for this service type." - first_nonfull_ipv4_network = None - for ipv4_network_info in ipv4_networks_info: - assert 'network' in ipv4_network_info - capacity = _get_network_capacity(ipv4_network_info["network"]) - if capacity < 1000: - first_nonfull_ipv4_network = ipv4_network_info["network"] - break - # Create a new network if the existing networks in the container for - # the service type are all full. - if not first_nonfull_ipv4_network: - first_nonfull_ipv4_network = str(allocate_service_ipv4_network( - service_type=service_type).v4) - assert first_nonfull_ipv4_network, \ - "No available IPv4 addresses for this service type." - v4_host = _allocate_host(hostname=hostname+domain_name, - network=first_nonfull_ipv4_network, - extattrs=extattrs) - elif service_networks: - network = service_networks.v4 - assert any(network.subnet_of(ipv4_container) - for ipv4_container in ipv4_containers) - v4_host = _allocate_host(hostname=hostname+domain_name, - network=str(network), - extattrs=extattrs) - elif host_addresses: - addr = host_addresses.v4 - assert any(addr in ipv4_container - for ipv4_container in ipv4_containers) - v4_host = _allocate_host(hostname=hostname+domain_name, - addr=str(addr), - extattrs=extattrs) - - # IPv6 - if not service_networks and not host_addresses: - # ipv6 does not support capacity fetching (not even the GUI displays - # it). Maybe it's assumed that there is always available space? - ipv6_networks_info = _find_networks( - network_container=str(ipv6_containers[0]), - ip_version=6) - assert len(ipv6_networks_info) >= 1, \ - "No IPv6 network exists in the container for this service type." - assert 'network' in ipv6_networks_info[0] - # TODO: if "no available IP" error, create a new network? - v6_host = _allocate_host(hostname=hostname+domain_name, - network=ipv6_networks_info[0]['network'], - extattrs=extattrs) - elif service_networks: - network = service_networks.v6 - assert any(network.subnet_of(ipv6_container) - for ipv6_container in ipv6_containers) - v6_host = _allocate_host(hostname=hostname+domain_name, - network=str(network), - extattrs=extattrs) - elif host_addresses: - addr = host_addresses.v6 - assert any(addr in ipv6_container - for ipv6_container in ipv6_containers) - v6_host = _allocate_host(hostname=hostname+domain_name, - addr=str(addr), - extattrs=extattrs) - - return HostAddresses(v4=v4_host.v4, v6=v6_host.v6) - - -""" -Below methods are not used for supported outside calls -""" - -''' -def _find_containers(network=None, ip_version=4): - """ - If network is not None, find that container. - Otherwise find all containers. - """ - assert ip_version in [4, 6] - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - endpoint = 'networkcontainer' if ip_version == 4 \ - else 'ipv6networkcontainer' - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={'network': network} if network else None, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() - - -def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - """ - Delete IPv4 or IPv6 network by CIDR. - """ - # TODO: should we check that there are no hosts in this network before - # deleting? Deleting a network deletes the hosts in it, but not the - # associated DNS records. - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - - network_info = _find_networks(network=network, ip_version=ip_version) - assert len(network_info) == 1, "Network does not exist." - assert '_ref' in network_info[0] - - r = requests.delete( - f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Extract ipv4/ipv6 address from the network reference obtained in the - # response - r_json = r.json() - network_address = ipaddress.ip_network( - r_json.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) - if ip_version == 4: - return V4ServiceNetwork(v4=network_address) - else: - return V6ServiceNetwork(v6=network_address) - - -def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]: - """ - Delete IPv4 or IPv6 host by its address. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_addr_version(addr) - ip_param = 'ipv4addr' if ip_version == 4 else 'ipv6addr' - - # Find host record reference - r = requests.get( - f'{_wapi(infoblox_params)}/record:host', - params={ip_param: addr}, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - host_data = r.json() - assert len(host_data) == 1, "Host does not exist." - assert '_ref' in host_data[0] - host_ref = host_data[0]['_ref'] - - # Delete it - r = requests.delete( - f'{_wapi(infoblox_params)}/{host_ref}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - # Also find and delete the associated dns a/aaaa record - endpoint = 'record:a' if ip_version == 4 else 'record:aaaa' - - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={ip_param: addr}, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - dns_data = r.json() - assert len(dns_data) == 1, "DNS record does not exist." - assert '_ref' in dns_data[0] - dns_ref = dns_data[0]['_ref'] - - r = requests.delete( - f'{_wapi(infoblox_params)}/{dns_ref}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - - if ip_version == 4: - return V4HostAddress(v4=addr) - else: - return V6HostAddress(v6=addr) - - -def _get_network_usage_status(network): - """ - Get status and usage fields of all hosts in the specified ipv4 or ipv6 - network. - """ - oss = settings.load_oss_params() - assert oss.IPAM.INFOBLOX - infoblox_params = oss.IPAM.INFOBLOX - - ip_version = _ip_network_version(network) - endpoint = 'ipv4address' if ip_version == 4 else 'ipv6address' - - r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', - params={ - '_return_fields': 'ip_address,status,usage', - 'network': network}, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False - ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - return r.json() -''' -''' -if __name__ == '__main__': - while True: - print("1. Find all containers") - print("2. Find all networks") - print("3. Get network capacity") - print("4. Create new network") - print("5. Delete network") - print("6. Allocate host by IP") - print("7. Allocate host by network CIDR") - print("8. Allocate host by service type") - print("9. Delete host by IP") - print("10. Get network usage status") - print("11. Exit") - - choice = input("Enter your choice: ") - - if choice == '1': - ip_version = int(input("Enter IP version (4 or 6): ")) - containers = _find_containers(ip_version=ip_version) - print(json.dumps(containers, indent=2)) - - elif choice == '2': - ip_version = int(input("Enter IP version (4 or 6): ")) - networks = _find_networks(ip_version=ip_version) - print(json.dumps(networks, indent=2)) - - elif choice == '3': - network = input("Enter network (in CIDR notation): ") - network_capacity = _get_network_capacity(network=network) - print(json.dumps(network_capacity, indent=2)) - - elif choice == '4': - service_type = input("Enter service type: ") - comment = input("Enter a comment for the network: ") - ip_version = int(input("Enter IP version (4 or 6): ")) - if ip_version == 4: - new_network = allocate_service_ipv4_network( - comment=comment, service_type=service_type) - elif ip_version == 6: - new_network = allocate_service_ipv6_network( - comment=comment, service_type=service_type) - else: - print("Invalid IP version. Please enter either 4 or 6.") - continue - print(json.dumps(str(new_network), indent=2)) - - elif choice == '5': - network = input("Enter network to delete (in CIDR notation): ") - deleted_network = _delete_network(network=network) - print(json.dumps(str(deleted_network), indent=2)) - - elif choice == '6': - hostname = input("Enter host name (full name w/ domain name): ") - addr = input("Enter IP address to allocate: ") - alloc_ip = _allocate_host(hostname=hostname, addr=addr) - print(json.dumps(str(alloc_ip), indent=2)) - - elif choice == '7': - hostname = input("Enter host name (full name w/ domain name): ") - network = input( - "Enter existing network to allocate from (CIDR notation): ") - alloc_ip = _allocate_host(hostname=hostname, network=network) - print(json.dumps(str(alloc_ip), indent=2)) - - elif choice == '8': - hostname = input("Enter host name (w/o domain name): ") - service_type = input("Enter service type: ") - alloc_ip = allocate_service_host( - hostname=hostname, - service_type=service_type) - print(json.dumps(str(alloc_ip), indent=2)) - - elif choice == '9': - addr = input("Enter IP address of host to delete: ") - deleted_host = _delete_host_by_ip(addr=addr) - print(json.dumps(str(deleted_host), indent=2)) - - elif choice == '10': - network = input( - "Enter network to get host usage status (CIDR notation): ") - usage_status_info = _get_network_usage_status(network=network) - print(json.dumps(usage_status_info, indent=2)) - - elif choice == '11': - print("Exiting...") - break - - else: - print("Invalid choice. Please try again.") -''' +import ipaddress +import requests +from enum import Enum +from pydantic import BaseSettings +from requests.auth import HTTPBasicAuth +from typing import Union + +from gso import settings + + +class V4ServiceNetwork(BaseSettings): + v4: ipaddress.IPv4Network + + +class V6ServiceNetwork(BaseSettings): + v6: ipaddress.IPv6Network + + +class ServiceNetworks(BaseSettings): + v4: ipaddress.IPv4Network + v6: ipaddress.IPv6Network + + +class V4HostAddress(BaseSettings): + v4: ipaddress.IPv4Address + + +class V6HostAddress(BaseSettings): + v6: ipaddress.IPv6Address + + +class HostAddresses(BaseSettings): + v4: ipaddress.IPv4Address + v6: ipaddress.IPv6Address + + +class IPAMErrors(Enum): + # HTTP error code, match in error message + CONTAINER_FULL = 400, "Can not find requested number of networks" + NETWORK_FULL = 400, \ + "Cannot find 1 available IP address(es) in this network" + EXTATTR_UNKNOWN = 400, "Unknown extensible attribute" + EXTATTR_BADVALUE = 400, "Bad value for extensible attribute" + + +# TODO: remove this! +# lab infoblox cert is not valid for the ipv4 address +# ... disable warnings for now +requests.packages.urllib3.disable_warnings() + + +def _match_error_code(response, error_code): + return response.status_code == error_code.value[0] \ + and error_code.value[1] in response.text + + +def _wapi(infoblox_params: settings.InfoBloxParams): + return (f'https://{infoblox_params.host}' + f'/wapi/{infoblox_params.wapi_version}') + + +def _ip_addr_version(addr): + ip_version = None + ip_addr = ipaddress.ip_address(addr) + if isinstance(ip_addr, ipaddress.IPv4Address): + ip_version = 4 + elif isinstance(ip_addr, ipaddress.IPv6Address): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + + +def _ip_network_version(network): + ip_version = None + ip_network = ipaddress.ip_network(network) + if isinstance(ip_network, ipaddress.IPv4Network): + ip_version = 4 + elif isinstance(ip_network, ipaddress.IPv6Network): + ip_version = 6 + assert ip_version in [4, 6] + return ip_version + + +def _find_networks(network_container=None, network=None, ip_version=4): + """ + If network_container is not None, find all networks within the specified + container. + Otherwise, if network is not None, find the specified network. + Otherwise find all networks. + """ + assert ip_version in [4, 6] + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + endpoint = 'network' if ip_version == 4 else 'ipv6network' + params = None + if network_container: + params = {'network_container': network_container} + elif network: + params = {'network': network} + r = requests.get( + f'{_wapi(infoblox_params)}/{endpoint}', + params=params, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + # TODO: propagate "network not found" error to caller + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + + +def _allocate_network( + infoblox_params: settings.InfoBloxParams, + network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], + ip_version=4, + comment="", + extattrs={} +) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + assert ip_version in [4, 6] + endpoint = 'network' if ip_version == 4 else 'ipv6network' + ip_container = 'networkcontainer' if ip_version == 4 else \ + 'ipv6networkcontainer' + + assert network_params.containers, \ + "No containers available to allocate networks for this service." \ + "Maybe you want to allocate a host from a network directly?" + + # only return in the response the allocated network, not all available + # TODO: any validation needed for extrattrs wherever it's used? + req_payload = { + "network": { + "_object_function": "next_available_network", + "_parameters": { + "cidr": network_params.mask + }, + "_object": ip_container, + "_object_parameters": { + "network": str(network_params.containers[0]) + }, + "_result_field": "networks", + }, + "comment": comment, + "extattrs": extattrs + } + + container_index = 0 + while True: + r = requests.post( + f'{_wapi(infoblox_params)}/{endpoint}', + params={'_return_fields': 'network'}, + json=req_payload, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + headers={'content-type': "application/json"}, + verify=False + ) + if not _match_error_code(response=r, + error_code=IPAMErrors.CONTAINER_FULL): + break + # Container full: try with next valid container for service (if any) + container_index += 1 + if len(network_params.containers) < (container_index + 1): + break + req_payload["network"]["_object_parameters"]["network"] = \ + str(network_params.containers[container_index]) + + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + assert 'network' in r.json() + allocated_network = r.json()['network'] + if ip_version == 4: + return V4ServiceNetwork(v4=allocated_network) + else: + return V6ServiceNetwork(v6=allocated_network) + + +def allocate_service_ipv4_network(service_type, comment="", extattrs={} + ) -> V4ServiceNetwork: + """ + Allocate IPv4 network within the container of the specified service type. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + return _allocate_network(ipam_params.INFOBLOX, + getattr(ipam_params, service_type).V4, + 4, + comment, + extattrs) + + +def allocate_service_ipv6_network(service_type, comment="", extattrs={} + ) -> V6ServiceNetwork: + """ + Allocate IPv6 network within the container of the specified service type. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + return _allocate_network(ipam_params.INFOBLOX, + getattr(ipam_params, service_type).V6, + 6, + comment, + extattrs) + + +def _find_next_available_ip(infoblox_params, network_ref): + """ + Find the next available IP address from a network given its ref. + Returns "NETWORK_FULL" if there's no space in the network. + Otherwise returns the next available IP address in the network. + """ + r = requests.post( + f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1', # noqa: E501 + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + + if _match_error_code(response=r, + error_code=IPAMErrors.NETWORK_FULL): + return "NETWORK_FULL" + + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert 'ips' in r.json() + received_ip = r.json()['ips'] + assert len(received_ip) == 1 + return received_ip[0] + + +def _allocate_host(hostname=None, + addrs=None, + networks=None, + cname_aliases=None, + extattrs={} + ) -> Union[HostAddresses, str]: + """ + If networks is not None, allocate host in those networks. + Otherwise if addrs is not None, allocate host with those addresses. + hostname parameter must be full name including domain name. + Return an error string if couldn't allocate host due to network full. + """ + # TODO: should hostnames be unique + # (i.e. fail if hostname already exists in this domain/service)? + assert addrs or networks, \ + "You must specify either the host addresses or the networks CIDR." + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + if networks: + ipv4_network = networks[0] + ipv6_network = networks[1] + assert _ip_network_version(ipv4_network) == 4 + assert _ip_network_version(ipv6_network) == 6 + + # Find the next available IP address in each network + network_info = _find_networks(network=ipv4_network, ip_version=4) + assert len(network_info) == 1, \ + "IPv4 Network does not exist. Create it first." + assert '_ref' in network_info[0] + ipv4_addr = _find_next_available_ip(infoblox_params, + network_info[0]["_ref"]) + + network_info = _find_networks(network=ipv6_network, ip_version=6) + assert len(network_info) == 1, \ + "IPv6 Network does not exist. Create it first." + assert '_ref' in network_info[0] + ipv6_addr = _find_next_available_ip(infoblox_params, + network_info[0]["_ref"]) + + # If couldn't find next available IPs, return error + if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": + if ipv4_addr == "NETWORK_FULL": + return "IPV4_NETWORK_FULL" + if ipv6_addr == "NETWORK_FULL": + return "IPV6_NETWORK_FULL" + + else: + ipv4_addr = addrs[0] + ipv6_addr = addrs[1] + assert _ip_addr_version(ipv4_addr) == 4 + assert _ip_addr_version(ipv6_addr) == 6 + + req_payload = { + "ipv4addrs": [ + { + "ipv4addr": ipv4_addr + } + ], + "ipv6addrs": [ + { + "ipv6addr": ipv6_addr + } + ], + "name": hostname, + "configure_for_dns": True, + "view": "default", + "extattrs": extattrs + } + + r = requests.post( + f'{_wapi(infoblox_params)}/record:host', + json=req_payload, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert isinstance(r.json(), str) + assert r.json().startswith("record:host/") + + if cname_aliases: + + cname_req_payload = { + "name": "", + "canonical": hostname, + "view": "default", + "extattrs": extattrs + } + + for alias in cname_aliases: + cname_req_payload["name"] = alias + r = requests.post( + f'{_wapi(infoblox_params)}/record:cname', + json=cname_req_payload, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.json().startswith("record:cname/") + + return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), + v6=ipaddress.ip_address(ipv6_addr)) + + +def allocate_service_host(hostname=None, + service_type=None, + service_networks: ServiceNetworks = None, + host_addresses: HostAddresses = None, + cname_aliases=None, + extattrs={} + ) -> HostAddresses: + """ + Allocate host record with both IPv4 and IPv6 address, and respective DNS + A and AAAA records. + - If service_networks is provided, and it's in a valid container, + that one is used. + - If service_networks is not provided, and host_addresses is provided, + those specific addresses are used. + - If neither is provided: + - If service has configured containers, new ipv4 and ipv6 networks are + created and those are used. Note that in this case extattrs is for the + hosts and not for the networks. + - If service doesn't have configured containers and has configured + networks instead, the configured networks are used (they are filled up + in order of appearance in the configuration file). + The domain name is taken from the service type and appended to the + specified hostname. + """ + oss = settings.load_oss_params() + assert oss.IPAM + ipam_params = oss.IPAM + + assert hasattr(ipam_params, service_type) \ + and service_type != 'INFOBLOX', "Invalid service type." + oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers + oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers + oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks + oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks + domain_name = getattr(ipam_params, service_type).domain_name + + assert (oss_ipv4_containers and oss_ipv6_containers) \ + or (oss_ipv4_networks and oss_ipv6_networks), \ + "This service is missing either containers or networks configuration." + assert domain_name, "This service is missing domain_name configuration." + + if cname_aliases: + cname_aliases = [alias + domain_name for alias in cname_aliases] + + if not service_networks and not host_addresses: + if oss_ipv4_containers and oss_ipv6_containers: + # This service has configured containers. + # Use them to allocate new networks that can allocate the hosts. + + # IPv4 + ipv4_network = str(allocate_service_ipv4_network( + service_type=service_type).v4) + assert ipv4_network, \ + "No available space for IPv4 networks for this service type." + + # IPv6 + ipv6_network = str(allocate_service_ipv6_network( + service_type=service_type).v6) + assert ipv6_network, \ + "No available space for IPv6 networks for this service type." + + elif oss_ipv4_networks and oss_ipv6_networks: + # This service has configured networks. + # Allocate a host inside an ipv4 and ipv6 network from among them. + ipv4_network = str(oss_ipv4_networks[0]) + ipv6_network = str(oss_ipv6_networks[0]) + + while True: + ipv4_network_index = 0 + ipv6_network_index = 0 + network_tuple = (ipv4_network, ipv6_network) + host = _allocate_host(hostname=hostname+domain_name, + networks=network_tuple, + cname_aliases=cname_aliases, + extattrs=extattrs) + + if "NETWORK_FULL" not in host: + break + elif "IPV4" in host: + ipv4_network_index += 1 + assert ipv4_network_index < len(oss_ipv4_networks), \ + "No available space in any IPv4 network for this service." + ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) + else: # IPV6 in host + ipv6_network_index += 1 + assert ipv6_network_index < len(oss_ipv6_networks), \ + "No available space in any IPv6 network for this service." + ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) + + elif service_networks: + # IPv4 + ipv4_network = service_networks.v4 + if oss_ipv4_containers: + assert any(ipv4_network.subnet_of(oss_ipv4_container) + for oss_ipv4_container in oss_ipv4_containers) + else: + assert ipv4_network in oss_ipv4_networks + + # IPv6 + ipv6_network = service_networks.v6 + if oss_ipv6_containers: + assert any(ipv6_network.subnet_of(oss_ipv6_container) + for oss_ipv6_container in oss_ipv6_containers) + else: + assert ipv6_network in oss_ipv6_networks + + host = _allocate_host( + hostname=hostname+domain_name, + networks=(str(ipv4_network), str(ipv6_network)), + cname_aliases=cname_aliases, + extattrs=extattrs + ) + assert "NETWORK_FULL" not in host + + elif host_addresses: + # IPv4 + ipv4_addr = host_addresses.v4 + if oss_ipv4_containers: + assert any(ipv4_addr in oss_ipv4_container + for oss_ipv4_container in oss_ipv4_containers) + else: + assert any(ipv4_addr in oss_ipv4_network + for oss_ipv4_network in oss_ipv4_networks) + + # IPv6 + ipv6_addr = host_addresses.v6 + if oss_ipv6_containers: + assert any(ipv6_addr in oss_ipv6_container + for oss_ipv6_container in oss_ipv6_containers) + else: + assert any(ipv4_addr in oss_ipv6_network + for oss_ipv6_network in oss_ipv6_networks) + + host = _allocate_host( + hostname=hostname+domain_name, + addrs=(str(ipv4_addr), str(ipv6_addr)), + cname_aliases=cname_aliases, + extattrs=extattrs + ) + assert "NETWORK_FULL" not in host + + return host + + +""" +Below methods are not used for supported outside calls +""" + +''' +def _find_containers(network=None, ip_version=4): + """ + If network is not None, find that container. + Otherwise find all containers. + """ + assert ip_version in [4, 6] + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + endpoint = 'networkcontainer' if ip_version == 4 \ + else 'ipv6networkcontainer' + r = requests.get( + f'{_wapi(infoblox_params)}/{endpoint}', + params={'network': network} if network else None, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() + + +def _get_network_capacity(network=None): + """ + Get utilization of a IPv4 network in a fraction of 1000. + """ + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + ip_version = _ip_network_version(network) + assert ip_version == 4, "Utilization is only available for IPv4 networks." + params = { + 'network': network, + '_return_fields': 'network,total_hosts,utilization' + } + + r = requests.get( + f'{_wapi(infoblox_params)}/network', + params=params, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + # Utilization info takes several minutes to converge. + # The IPAM utilization bar in the GUI as well. Why? + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + capacity_info = r.json() + assert len(capacity_info) == 1, "Requested IPv4 network doesn't exist." + assert 'utilization' in capacity_info[0] + utilization = capacity_info[0]['utilization'] + return utilization + + +def _delete_network(network) -> Union[V4ServiceNetwork, V6ServiceNetwork]: + """ + Delete IPv4 or IPv6 network by CIDR. + """ + # TODO: should we check that there are no hosts in this network before + # deleting? Deleting a network deletes the hosts in it, but not the + # associated DNS records. + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + ip_version = _ip_network_version(network) + + network_info = _find_networks(network=network, ip_version=ip_version) + assert len(network_info) == 1, "Network does not exist." + assert '_ref' in network_info[0] + + r = requests.delete( + f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Extract ipv4/ipv6 address from the network reference obtained in the + # response + r_json = r.json() + network_address = ipaddress.ip_network( + r_json.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) + if ip_version == 4: + return V4ServiceNetwork(v4=network_address) + else: + return V6ServiceNetwork(v6=network_address) + + +def _delete_host_by_ip(addr) -> Union[V4HostAddress, V6HostAddress]: + """ + Delete IPv4 or IPv6 host by its address. + """ + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + ip_version = _ip_addr_version(addr) + ip_param = 'ipv4addr' if ip_version == 4 else 'ipv6addr' + + # Find host record reference + r = requests.get( + f'{_wapi(infoblox_params)}/record:host', + params={ip_param: addr}, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + host_data = r.json() + assert len(host_data) == 1, "Host does not exist." + assert '_ref' in host_data[0] + host_ref = host_data[0]['_ref'] + + # Delete it + r = requests.delete( + f'{_wapi(infoblox_params)}/{host_ref}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + # Also find and delete the associated dns a/aaaa record + endpoint = 'record:a' if ip_version == 4 else 'record:aaaa' + + r = requests.get( + f'{_wapi(infoblox_params)}/{endpoint}', + params={ip_param: addr}, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + dns_data = r.json() + assert len(dns_data) == 1, "DNS record does not exist." + assert '_ref' in dns_data[0] + dns_ref = dns_data[0]['_ref'] + + r = requests.delete( + f'{_wapi(infoblox_params)}/{dns_ref}', + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + + if ip_version == 4: + return V4HostAddress(v4=addr) + else: + return V6HostAddress(v6=addr) + + +def _get_network_usage_status(network): + """ + Get status and usage fields of all hosts in the specified ipv4 or ipv6 + network. + """ + oss = settings.load_oss_params() + assert oss.IPAM.INFOBLOX + infoblox_params = oss.IPAM.INFOBLOX + + ip_version = _ip_network_version(network) + endpoint = 'ipv4address' if ip_version == 4 else 'ipv6address' + + r = requests.get( + f'{_wapi(infoblox_params)}/{endpoint}', + params={ + '_return_fields': 'ip_address,status,usage', + 'network': network}, + auth=HTTPBasicAuth(infoblox_params.username, + infoblox_params.password), + verify=False + ) + assert r.status_code >= 200 and r.status_code < 300, \ + f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + return r.json() +''' diff --git a/gso/services/ipam.py b/gso/services/ipam.py index c41a1239a14506e3104aaad5d5e4dcdfadd4dcae..96c957d07883ac644fa202e69614e5bb6049dacf 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -46,68 +46,64 @@ def new_service_host(hostname, service_type, service_networks: ServiceNetworks = None, host_addresses: HostAddresses = None, + cname_aliases=None, extattrs={}) -> HostAddresses: return _ipam.allocate_service_host( hostname=hostname, service_type=service_type, service_networks=service_networks, host_addresses=host_addresses, + cname_aliases=cname_aliases, extattrs=extattrs) -''' if __name__ == '__main__': # sample call flow to allocate two loopback interfaces and a trunk service - # new_service_host can be called passing networks or addresses - # - host h1 for service LO uses a specific ipv4/ipv6 address pair - # - the rest use the ipv4/ipv6 network pair + # new_service_host can be called passing networks, addresses, or nothing. + # - host h1 for service TRUNK uses a specific ipv4/ipv6 address pair + # - host h2 for service TRUNK uses the ipv4/ipv6 network pair + # - service LO uses nothing # networks and hosts can be allocated with extensible attributes - # - host h2 for service LO uses extattrs for both network and address/DNS - # - the rest don't use extattrs + # networks can be created with a comment + # CNAME records can be optionally created - hostname_A = 'h1' - hostname_B = 'h2' + hostname_A = 'hA' + hostname_B = 'hB' # h1 LO (loopback) - lo1_service_networks = new_service_networks( - service_type='LO', - comment="Network for h1 LO" - ) - lo1_v4_host_address = lo1_service_networks.v4.network_address - lo1_v6_host_address = lo1_service_networks.v6.network_address - print(lo1_v4_host_address) - print(lo1_v6_host_address) - lo1_host_addresses = HostAddresses(v4=lo1_v4_host_address, - v6=lo1_v6_host_address) - new_service_host(hostname=hostname_A, - service_type='LO', - host_addresses=lo1_host_addresses) + new_service_host(hostname=hostname_A+"_LO", + cname_aliases=["alias1.hA", "alias2.hA"], + service_type='LO') # h2 LO (loopback) - lo2_network_extattrs = { + new_service_host(hostname=hostname_B+"_LO", + cname_aliases=["alias1.hB"], + service_type='LO') + + # h1-h2 TRUNK + trunk12_network_extattrs = { "vrf_name": {"value": "dummy_vrf"}, } - lo2_host_extattrs = { + trunk12_host_extattrs = { "Site": {"value": "dummy_site"}, } - lo2_service_networks = \ - new_service_networks(service_type='LO', - comment="Network for h2 LO", - extattrs=lo2_network_extattrs) - new_service_host(hostname=hostname_B, - service_type='LO', - service_networks=lo2_service_networks, - extattrs=lo2_host_extattrs) - - # h1-h2 TRUNK trunk12_service_networks = new_service_networks( service_type='TRUNK', + extattrs=trunk12_network_extattrs, comment="Network for h1-h2 TRUNK" ) - new_service_host(hostname=hostname_A, + + trunk12_v4_host_address = trunk12_service_networks.v4.network_address + trunk12_v6_host_address = trunk12_service_networks.v6.network_address + trunk12_host_addresses = HostAddresses(v4=trunk12_v4_host_address, + v6=trunk12_v6_host_address) + + new_service_host(hostname=hostname_A+"_TRUNK", service_type='TRUNK', - service_networks=trunk12_service_networks) - new_service_host(hostname=hostname_B, + host_addresses=trunk12_host_addresses, + extattrs=trunk12_host_extattrs) + + new_service_host(hostname=hostname_B+"_TRUNK", service_type='TRUNK', - service_networks=trunk12_service_networks) -''' + service_networks=trunk12_service_networks, + extattrs=trunk12_host_extattrs) diff --git a/gso/settings.py b/gso/settings.py index 76e1dccfd4cce663aa61aeb50c8ebebc51eabb56..7167785cb32245f51316809db517bd09bbc251d8 100644 --- a/gso/settings.py +++ b/gso/settings.py @@ -18,11 +18,13 @@ class InfoBloxParams(BaseSettings): class V4NetworkParams(BaseSettings): containers: list[ipaddress.IPv4Network] + networks: list[ipaddress.IPv4Network] mask: int # TODO: validation on mask? class V6NetworkParams(BaseSettings): containers: list[ipaddress.IPv6Network] + networks: list[ipaddress.IPv6Network] mask: int # TODO: validation on mask? diff --git a/gso/workflows/device/create_device.py b/gso/workflows/device/create_device.py index 1a50b85ec465b9a8be98406491fa515cbdd21405..d7646e5da5b0ff2f484372266abf905d5302206e 100644 --- a/gso/workflows/device/create_device.py +++ b/gso/workflows/device/create_device.py @@ -19,6 +19,7 @@ from gso.products.product_types import device from gso.products.product_types.device import DeviceInactive, \ DeviceProvisioning from gso.products.product_types.site import Site +from gso.services import _ipam from gso.services import provisioning_proxy from gso.services.provisioning_proxy import await_pp_results, \ confirm_pp_results @@ -84,15 +85,14 @@ def iso_from_ipv4(ipv4_address): @step('Get information from IPAM') -def get_info_from_ipam(subscription: DeviceInactive) -> State: - # lo = ipam.new_device_lo_address() - # subscription.device.lo_ipv4_address = lo.v4 - # subscription.device.lo_ipv6_address = lo.v6 - # TODO: get info about how these should be generated - subscription.device.device_lo_ipv4_address = \ - ipaddress.ip_address('10.10.10.20') - subscription.device.device_lo_ipv6_address = \ - ipaddress.ip_address('fc00:798:10::20') +def get_info_from_ipam(subscription: DeviceProvisioning) -> State: + lo0_alias = re.sub('.geant.net', '', subscription.device.device_fqdn) + lo0_name = f'lo0.{lo0_alias}' + lo0_addr = _ipam.allocate_service_host(hostname=lo0_name, + service_type='LO', + cname_aliases=[lo0_alias]) + subscription.device.device_lo_ipv4_address = lo0_addr.v4 + subscription.device.device_lo_ipv6_address = lo0_addr.v6 subscription.device.device_lo_iso_address \ = iso_from_ipv4(str(subscription.device.device_lo_ipv4_address)) subscription.device.device_si_ipv4_network = '192.168.0.0/31' @@ -116,19 +116,19 @@ def initialize_subscription( subscription.device.device_vendor = device_vendor subscription.device.device_site \ = Site.from_subscription(device_site[0]).site - fqdn = str(hostname + '.' + - subscription.device.device_site.site_name.lower() + '.' + - subscription.device.device_site.site_country_code.lower() + - '.geant.net') + fqdn = f'{hostname}.{subscription.device.device_site.site_name.lower()}.' \ + f'{subscription.device.device_site.site_country_code.lower()}' \ + f'.geant.net' subscription.device.device_fqdn = fqdn subscription.device.device_role = device_role subscription.description = f'Device {fqdn} ' \ f'({subscription.device_type})' + subscription = device.DeviceProvisioning.from_other_lifecycle( subscription, SubscriptionLifecycle.PROVISIONING ) - return {'subscription': subscription, 'fqdn': fqdn} + return {'subscription': subscription} @step('Provision device [DRY RUN]') @@ -158,8 +158,8 @@ def create_device(): init >> create_subscription >> store_process_subscription(Target.CREATE) - >> get_info_from_ipam >> initialize_subscription + >> get_info_from_ipam >> provision_device_dry >> await_pp_results >> confirm_pp_results diff --git a/test/conftest.py b/test/conftest.py index 2ef1475af538d31f0a235e78bd42fb79a834d859..e714bf98ee8be66741b220731ba07ae1693ab16b 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -2,9 +2,10 @@ import contextlib import json import os import socket -import pytest import tempfile +import pytest + @pytest.fixture(scope='session') def configuration_data(): @@ -26,17 +27,21 @@ def configuration_data(): "password": "robot-user-password" }, "LO": { - "V4": {"containers": ["10.255.255.0/24"], "mask": 32}, - "V6": {"containers": ["dead:beef::/64"], "mask": 128}, + "V4": {"containers": ["10.255.255.0/24"], "networks": [], + "mask": 32}, + "V6": {"containers": ["dead:beef::/64"], "networks": [], + "mask": 128}, "domain_name": ".lo" }, "TRUNK": { "V4": { "containers": ["10.255.255.0/24", "10.255.254.0/24"], + "networks": [], "mask": 31 }, "V6": { "containers": ["dead:beef::/64", "dead:beee::/64"], + "networks": [], "mask": 126 }, "domain_name": ".trunk" @@ -44,10 +49,12 @@ def configuration_data(): "GEANT_IP": { "V4": { "containers": ["10.255.255.0/24", "10.255.254.0/24"], + "networks": [], "mask": 31 }, "V6": { "containers": ["dead:beef::/64", "dead:beee::/64"], + "networks": [], "mask": 126 }, "domain_name": ".geantip" diff --git a/test/test_ipam.py b/test/test_ipam.py index 2778148347a7fe84164305d86029efe3c5750ddc..23c1aec2ce6598a64d34e65f6cf421839af85629 100644 --- a/test/test_ipam.py +++ b/test/test_ipam.py @@ -1,5 +1,6 @@ import ipaddress import re + import responses from gso.services import ipam @@ -7,12 +8,12 @@ from gso.services import ipam @responses.activate def test_new_service_networks(data_config_filename): - responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/network.*'), json={ - '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501 + '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.2' + '55.20/32/default', # noqa: E501 'network': '10.255.255.20/32' } ) @@ -21,7 +22,8 @@ def test_new_service_networks(data_config_filename): method=responses.POST, url=re.compile(r'.*/wapi.*/ipv6network.*'), json={ - '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501 + '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:de' + 'ad%3Abeef%3A%3A18/128/default', # noqa: E501 'network': 'dead:beef::18/128' } ) @@ -35,23 +37,25 @@ def test_new_service_networks(data_config_filename): @responses.activate def test_new_service_host(data_config_filename): - responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:host$'), - json='record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20' # noqa: E501 + json='record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4' + 'MzY3MC5nc28udGVzdA:test.lo/%20' # noqa: E501 ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:a$'), - json='record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501 + json='record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS4' + '4:test.lo/default' # noqa: E501 ) responses.add( method=responses.POST, url=re.compile(r'.*/wapi.*/record:aaaa$'), - json='record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501 + json='record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1' + 'NS44:test.lo/default' # noqa: E501 ) responses.add( @@ -59,7 +63,8 @@ def test_new_service_host(data_config_filename): url=re.compile(r'.*/wapi.*/network.*'), json=[ { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.2" + "55.255.20/32/default", # noqa: E501 "network": "10.255.255.20/32", "network_view": "default" } @@ -72,7 +77,8 @@ def test_new_service_host(data_config_filename): url=re.compile(r'.*/wapi.*/ipv6network.*'), json=[ { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvM" + "A:dead%3Abeef%3A%3A18/128/default", # noqa: E501 "network": "dead:beef::18/128", "network_view": "default" } @@ -81,13 +87,17 @@ def test_new_service_host(data_config_filename): responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/network.*/.*?_function=next_available_ip&num=1.*'), # noqa: E501 + url=re.compile( + r'.*/wapi.*/network.*/.*?_function=next_available_ip&num=1.*'), + # noqa: E501 json={'ips': ['10.255.255.20']} ) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/ipv6network.*/.*?_function=next_available_ip&num=1.*'), # noqa: E501 + url=re.compile( + r'.*/wapi.*/ipv6network.*/.*?_function=next_available_ip&num=1.*'), + # noqa: E501 json={'ips': ['dead:beef::18']} )