diff --git a/gso/products/product_blocks/device.py b/gso/products/product_blocks/device.py index 0fd8f693ff4a37ccd55bbc2d2ffddc15dce8b787..e40269fb5df9554ef44d95b72017114949208971 100644 --- a/gso/products/product_blocks/device.py +++ b/gso/products/product_blocks/device.py @@ -18,9 +18,9 @@ class DeviceRole(strEnum): amt = "amt" -class DeviceBlockInactive(ProductBlockModel, - lifecycle=[SubscriptionLifecycle.INITIAL], - product_block_name="DeviceBlock"): +class DeviceBlockInactive( + ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="DeviceBlock" +): device_fqdn: Optional[str] = None device_ts_address: Optional[str] = None device_ts_port: Optional[int] = None @@ -36,8 +36,7 @@ class DeviceBlockInactive(ProductBlockModel, device_site: Optional[SiteBlockInactive] -class DeviceBlockProvisioning(DeviceBlockInactive, - lifecycle=[SubscriptionLifecycle.PROVISIONING]): +class DeviceBlockProvisioning(DeviceBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): device_fqdn: str device_ts_address: str device_ts_port: int @@ -53,8 +52,7 @@ class DeviceBlockProvisioning(DeviceBlockInactive, device_site: Optional[SiteBlockProvisioning] -class DeviceBlock(DeviceBlockProvisioning, - lifecycle=[SubscriptionLifecycle.ACTIVE]): +class DeviceBlock(DeviceBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): device_fqdn: str device_ts_address: str device_ts_port: int diff --git a/gso/products/product_blocks/iptrunk.py b/gso/products/product_blocks/iptrunk.py index c74df8f7b0931ef43d2025bcecb62d3609d745c9..bebe9c636546ad8803cad356be179d07aec3af95 100644 --- a/gso/products/product_blocks/iptrunk.py +++ b/gso/products/product_blocks/iptrunk.py @@ -13,9 +13,9 @@ class IptrunkType(strEnum): Leased = "Leased" -class IptrunkBlockInactive(ProductBlockModel, - lifecycle=[SubscriptionLifecycle.INITIAL], - product_block_name="IptrunkBlock"): +class IptrunkBlockInactive( + ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="IptrunkBlock" +): geant_s_sid: Optional[str] = None iptrunk_description: Optional[str] = None iptrunk_type: Optional[IptrunkType] = None @@ -29,19 +29,16 @@ class IptrunkBlockInactive(ProductBlockModel, iptrunk_sideA_ae_iface: Optional[str] = None iptrunk_sideA_ae_geant_a_sid: Optional[str] = None iptrunk_sideA_ae_members: list[str] = Field(default_factory=list) - iptrunk_sideA_ae_members_description: list[str] \ - = Field(default_factory=list) + iptrunk_sideA_ae_members_description: list[str] = Field(default_factory=list) # iptrunk_sideB_node: DeviceBlockInactive iptrunk_sideB_ae_iface: Optional[str] = None iptrunk_sideB_ae_geant_a_sid: Optional[str] = None iptrunk_sideB_ae_members: list[str] = Field(default_factory=list) - iptrunk_sideB_ae_members_description: list[str] \ - = Field(default_factory=list) + iptrunk_sideB_ae_members_description: list[str] = Field(default_factory=list) -class IptrunkBlockProvisioning(IptrunkBlockInactive, - lifecycle=[SubscriptionLifecycle.PROVISIONING]): +class IptrunkBlockProvisioning(IptrunkBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): geant_s_sid: Optional[str] = None iptrunk_description: Optional[str] = None iptrunk_type: Optional[IptrunkType] = None @@ -55,19 +52,16 @@ class IptrunkBlockProvisioning(IptrunkBlockInactive, iptrunk_sideA_ae_iface: Optional[str] = None iptrunk_sideA_ae_geant_a_sid: Optional[str] = None iptrunk_sideA_ae_members: list[str] = Field(default_factory=list) - iptrunk_sideA_ae_members_description: list[str] = Field( - default_factory=list) + iptrunk_sideA_ae_members_description: list[str] = Field(default_factory=list) # iptrunk_sideB_node: DeviceBlockProvisioning iptrunk_sideB_ae_iface: Optional[str] = None iptrunk_sideB_ae_geant_a_sid: Optional[str] = None iptrunk_sideB_ae_members: list[str] = Field(default_factory=list) - iptrunk_sideB_ae_members_description: list[str] = Field( - default_factory=list) + iptrunk_sideB_ae_members_description: list[str] = Field(default_factory=list) -class IptrunkBlock(IptrunkBlockProvisioning, - lifecycle=[SubscriptionLifecycle.ACTIVE]): +class IptrunkBlock(IptrunkBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): geant_s_sid: str iptrunk_description: str iptrunk_type: IptrunkType @@ -81,12 +75,10 @@ class IptrunkBlock(IptrunkBlockProvisioning, iptrunk_sideA_ae_iface: str iptrunk_sideA_ae_geant_a_sid: str iptrunk_sideA_ae_members: list[str] = Field(default_factory=list) - iptrunk_sideA_ae_members_description: list[str] = Field( - default_factory=list) + iptrunk_sideA_ae_members_description: list[str] = Field(default_factory=list) # iptrunk_sideB_node: DeviceBlock iptrunk_sideB_ae_iface: str iptrunk_sideB_ae_geant_a_sid: str iptrunk_sideB_ae_members: list[str] = Field(default_factory=list) - iptrunk_sideB_ae_members_description: list[str] = Field( - default_factory=list) + iptrunk_sideB_ae_members_description: list[str] = Field(default_factory=list) diff --git a/gso/products/product_blocks/site.py b/gso/products/product_blocks/site.py index 28594041f0df562d4d10c4b93ab846af1bd89139..7ee60963a06cd9ecd4e01521f09b4d549723220f 100644 --- a/gso/products/product_blocks/site.py +++ b/gso/products/product_blocks/site.py @@ -11,9 +11,7 @@ class SiteTier(strEnum): tier4 = 4 -class SiteBlockInactive(ProductBlockModel, - lifecycle=[SubscriptionLifecycle.INITIAL], - product_block_name="SiteBlock"): +class SiteBlockInactive(ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="SiteBlock"): site_name: Optional[str] = None site_city: Optional[str] = None site_country: Optional[str] = None @@ -25,8 +23,7 @@ class SiteBlockInactive(ProductBlockModel, site_tier: Optional[SiteTier] = None -class SiteBlockProvisioning(SiteBlockInactive, - lifecycle=[SubscriptionLifecycle.PROVISIONING]): +class SiteBlockProvisioning(SiteBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): site_name: Optional[str] = None site_city: Optional[str] = None site_country: Optional[str] = None @@ -38,8 +35,7 @@ class SiteBlockProvisioning(SiteBlockInactive, site_tier: Optional[SiteTier] = None -class SiteBlock(SiteBlockProvisioning, - lifecycle=[SubscriptionLifecycle.ACTIVE]): +class SiteBlock(SiteBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): site_name: str site_city: str site_country: str diff --git a/gso/products/product_types/device.py b/gso/products/product_types/device.py index 2fa03627ffc5ba7578e555549757391256be6544..e7dc270367c82acc4ad3d86a0a303376120331da 100644 --- a/gso/products/product_types/device.py +++ b/gso/products/product_types/device.py @@ -14,8 +14,7 @@ class DeviceInactive(SubscriptionModel, is_base=True): device: DeviceBlockInactive -class DeviceProvisioning(DeviceInactive, - lifecycle=[SubscriptionLifecycle.PROVISIONING]): +class DeviceProvisioning(DeviceInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): device_type: DeviceType device: DeviceBlockProvisioning diff --git a/gso/products/product_types/iptrunk.py b/gso/products/product_types/iptrunk.py index 6f0b40404dce5c1ad5733ee034a33ffe10604bfb..05d21e65059a433d8df17086d0be93bbeab31d68 100644 --- a/gso/products/product_types/iptrunk.py +++ b/gso/products/product_types/iptrunk.py @@ -8,8 +8,7 @@ class IptrunkInactive(SubscriptionModel, is_base=True): iptrunk: IptrunkBlockInactive -class IptrunkProvisioning(IptrunkInactive, - lifecycle=[SubscriptionLifecycle.PROVISIONING]): +class IptrunkProvisioning(IptrunkInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): iptrunk: IptrunkBlockProvisioning diff --git a/gso/products/product_types/site.py b/gso/products/product_types/site.py index 7571acc42ea001b349bf7516f55a295ea20d06ed..76e51cfcdfef2f94602bf1dc86672f35eb97876c 100644 --- a/gso/products/product_types/site.py +++ b/gso/products/product_types/site.py @@ -8,8 +8,7 @@ class SiteInactive(SubscriptionModel, is_base=True): site: SiteBlockInactive -class SiteProvisioning(SiteInactive, - lifecycle=[SubscriptionLifecycle.PROVISIONING]): +class SiteProvisioning(SiteInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): site: SiteBlockProvisioning diff --git a/gso/services/_ipam.py b/gso/services/_ipam.py index 89dd186eb82dd40851d47913e6dc033f09f16f34..8ef424eff2331b4c0c11936eb1fa051f0f5330b1 100644 --- a/gso/services/_ipam.py +++ b/gso/services/_ipam.py @@ -38,8 +38,7 @@ class HostAddresses(BaseSettings): class IPAMErrors(Enum): # HTTP error code, match in error message CONTAINER_FULL = 400, "Can not find requested number of networks" - NETWORK_FULL = 400, \ - "Cannot find 1 available IP address(es) in this network" + NETWORK_FULL = 400, "Cannot find 1 available IP address(es) in this network" EXTATTR_UNKNOWN = 400, "Unknown extensible attribute" EXTATTR_BADVALUE = 400, "Bad value for extensible attribute" @@ -51,13 +50,11 @@ requests.packages.urllib3.disable_warnings() def _match_error_code(response, error_code): - return response.status_code == error_code.value[0] \ - and error_code.value[1] in response.text + return response.status_code == error_code.value[0] and error_code.value[1] in response.text def _wapi(infoblox_params: settings.InfoBloxParams): - return (f'https://{infoblox_params.host}' - f'/wapi/{infoblox_params.wapi_version}') + return f"https://{infoblox_params.host}" f"/wapi/{infoblox_params.wapi_version}" def _ip_addr_version(addr): @@ -83,29 +80,32 @@ def _ip_network_version(network): def _assert_host_in_service( - ipv4_addr='', ipv6_addr='', - oss_ipv4_containers=None, oss_ipv6_containers=None, - oss_ipv4_networks=None, oss_ipv6_networks=None + ipv4_addr="", + ipv6_addr="", + oss_ipv4_containers=None, + oss_ipv6_containers=None, + oss_ipv4_networks=None, + oss_ipv6_networks=None, ): # IPv4 if oss_ipv4_containers: - assert any(ipv4_addr in oss_ipv4_container - for oss_ipv4_container in oss_ipv4_containers), \ - "Host's IPv4 address doesn't belong to service type." + assert any( + ipv4_addr in oss_ipv4_container for oss_ipv4_container in oss_ipv4_containers + ), "Host's IPv4 address doesn't belong to service type." else: - assert any(ipv4_addr in oss_ipv4_network - for oss_ipv4_network in oss_ipv4_networks), \ - "Host's IPv4 address doesn't belong to service type." + assert any( + ipv4_addr in oss_ipv4_network for oss_ipv4_network in oss_ipv4_networks + ), "Host's IPv4 address doesn't belong to service type." # IPv6 if oss_ipv6_containers: - assert any(ipv6_addr in oss_ipv6_container - for oss_ipv6_container in oss_ipv6_containers), \ - "Host's IPv6 address doesn't belong to service type." + assert any( + ipv6_addr in oss_ipv6_container for oss_ipv6_container in oss_ipv6_containers + ), "Host's IPv6 address doesn't belong to service type." else: - assert any(ipv6_addr in oss_ipv6_network - for oss_ipv6_network in oss_ipv6_networks), \ - "Host's IPv6 address doesn't belong to service type." + assert any( + ipv6_addr in oss_ipv6_network for oss_ipv6_network in oss_ipv6_networks + ), "Host's IPv6 address doesn't belong to service type." def _find_networks(network_container=None, network=None, ip_version=4): @@ -121,21 +121,19 @@ def _find_networks(network_container=None, network=None, ip_version=4): oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX - endpoint = 'network' if ip_version == 4 else 'ipv6network' + endpoint = "network" if ip_version == 4 else "ipv6network" params = None if network_container: - params = {'network_container': network_container} + params = {"network_container": network_container} elif network: - params = {'network': network} + params = {"network": network} r = requests.get( - f'{_wapi(infoblox_params)}/{endpoint}', + f"{_wapi(infoblox_params)}/{endpoint}", params=params, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return r.json() @@ -144,133 +142,106 @@ def _allocate_network( network_params: Union[settings.V4NetworkParams, settings.V6NetworkParams], ip_version=4, comment="", - extattrs={} + extattrs={}, ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: assert ip_version in [4, 6] - endpoint = 'network' if ip_version == 4 else 'ipv6network' - ip_container = 'networkcontainer' if ip_version == 4 else \ - 'ipv6networkcontainer' + endpoint = "network" if ip_version == 4 else "ipv6network" + ip_container = "networkcontainer" if ip_version == 4 else "ipv6networkcontainer" - assert network_params.containers, \ - "No containers available to allocate networks for this service." \ + assert network_params.containers, ( + "No containers available to allocate networks for this service." "Maybe you want to allocate a host from a network directly?" + ) # only return in the response the allocated network, not all available # TODO: any validation needed for extrattrs wherever it's used? req_payload = { "network": { "_object_function": "next_available_network", - "_parameters": { - "cidr": network_params.mask - }, + "_parameters": {"cidr": network_params.mask}, "_object": ip_container, - "_object_parameters": { - "network": str(network_params.containers[0]) - }, + "_object_parameters": {"network": str(network_params.containers[0])}, "_result_field": "networks", }, "comment": comment, - "extattrs": extattrs + "extattrs": extattrs, } container_index = 0 while True: r = requests.post( - f'{_wapi(infoblox_params)}/{endpoint}', - params={'_return_fields': 'network'}, + f"{_wapi(infoblox_params)}/{endpoint}", + params={"_return_fields": "network"}, json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - headers={'content-type': "application/json"}, - verify=False + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + headers={"content-type": "application/json"}, + verify=False, ) - if not _match_error_code(response=r, - error_code=IPAMErrors.CONTAINER_FULL): + if not _match_error_code(response=r, error_code=IPAMErrors.CONTAINER_FULL): break # Container full: try with next valid container for service (if any) container_index += 1 if len(network_params.containers) < (container_index + 1): break - req_payload["network"]["_object_parameters"]["network"] = \ - str(network_params.containers[container_index]) + req_payload["network"]["_object_parameters"]["network"] = str(network_params.containers[container_index]) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert 'network' in r.json() - allocated_network = r.json()['network'] + assert "network" in r.json() + allocated_network = r.json()["network"] if ip_version == 4: return V4ServiceNetwork(v4=ipaddress.ip_network(allocated_network)) else: return V6ServiceNetwork(v6=ipaddress.ip_network(allocated_network)) -def allocate_service_ipv4_network(service_type='', comment="", extattrs={} - ) -> V4ServiceNetwork: +def allocate_service_ipv4_network(service_type="", comment="", extattrs={}) -> V4ServiceNetwork: """ Allocate IPv4 network within the container of the specified service type. """ oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, - getattr(ipam_params, service_type).V4, - 4, - comment, - extattrs) + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V4, 4, comment, extattrs) -def allocate_service_ipv6_network(service_type='', comment="", extattrs={} - ) -> V6ServiceNetwork: +def allocate_service_ipv6_network(service_type="", comment="", extattrs={}) -> V6ServiceNetwork: """ Allocate IPv6 network within the container of the specified service type. """ oss = settings.load_oss_params() assert oss.IPAM ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." - return _allocate_network(ipam_params.INFOBLOX, - getattr(ipam_params, service_type).V6, - 6, - comment, - extattrs) + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." + return _allocate_network(ipam_params.INFOBLOX, getattr(ipam_params, service_type).V6, 6, comment, extattrs) -def _find_next_available_ip(infoblox_params, network_ref=''): +def _find_next_available_ip(infoblox_params, network_ref=""): """ Find the next available IP address from a network given its ref. Returns "NETWORK_FULL" if there's no space in the network. Otherwise returns the next available IP address in the network. """ r = requests.post( - f'{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1', # noqa: E501 - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + f"{_wapi(infoblox_params)}/{network_ref}?_function=next_available_ip&num=1", # noqa: E501 + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) - if _match_error_code(response=r, - error_code=IPAMErrors.NETWORK_FULL): + if _match_error_code(response=r, error_code=IPAMErrors.NETWORK_FULL): return "NETWORK_FULL" - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" - assert 'ips' in r.json() - received_ip = r.json()['ips'] + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert "ips" in r.json() + received_ip = r.json()["ips"] assert len(received_ip) == 1 return received_ip[0] -def _allocate_host(hostname='', - addrs=None, - networks=None, - cname_aliases=[], - dns_view="default", - extattrs={} - ) -> Union[HostAddresses, str]: +def _allocate_host( + hostname="", addrs=None, networks=None, cname_aliases=[], dns_view="default", extattrs={} +) -> Union[HostAddresses, str]: """ If networks is not None, allocate host in those networks. Otherwise if addrs is not None, allocate host with those addresses. @@ -282,8 +253,7 @@ def _allocate_host(hostname='', """ # TODO: should hostnames be unique # (i.e. fail if hostname already exists in this domain/service)? - assert addrs or networks, \ - "You must specify either the host addresses or the networks CIDR." + assert addrs or networks, "You must specify either the host addresses or the networks CIDR." oss = settings.load_oss_params() assert oss.IPAM.INFOBLOX infoblox_params = oss.IPAM.INFOBLOX @@ -298,16 +268,14 @@ def _allocate_host(hostname='', network_info = _find_networks(network=ipv4_network, ip_version=4) if len(network_info) != 1: return "IPV4_NETWORK_NOT_FOUND" - assert '_ref' in network_info[0] - ipv4_addr = _find_next_available_ip(infoblox_params, - network_info[0]["_ref"]) + assert "_ref" in network_info[0] + ipv4_addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) network_info = _find_networks(network=ipv6_network, ip_version=6) if len(network_info) != 1: return "IPV6_NETWORK_NOT_FOUND" - assert '_ref' in network_info[0] - ipv6_addr = _find_next_available_ip(infoblox_params, - network_info[0]["_ref"]) + assert "_ref" in network_info[0] + ipv6_addr = _find_next_available_ip(infoblox_params, network_info[0]["_ref"]) # If couldn't find next available IPs, return error if ipv4_addr == "NETWORK_FULL" or ipv6_addr == "NETWORK_FULL": @@ -323,67 +291,49 @@ def _allocate_host(hostname='', assert _ip_addr_version(ipv6_addr) == 6 req_payload = { - "ipv4addrs": [ - { - "ipv4addr": ipv4_addr - } - ], - "ipv6addrs": [ - { - "ipv6addr": ipv6_addr - } - ], + "ipv4addrs": [{"ipv4addr": ipv4_addr}], + "ipv6addrs": [{"ipv6addr": ipv6_addr}], "name": hostname, "configure_for_dns": True, "view": dns_view, - "extattrs": extattrs + "extattrs": extattrs, } r = requests.post( - f'{_wapi(infoblox_params)}/record:host', + f"{_wapi(infoblox_params)}/record:host", json=req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert isinstance(r.json(), str) assert r.json().startswith("record:host/") if cname_aliases: - - cname_req_payload = { - "name": "", - "canonical": hostname, - "view": dns_view, - "extattrs": extattrs - } + cname_req_payload = {"name": "", "canonical": hostname, "view": dns_view, "extattrs": extattrs} for alias in cname_aliases: cname_req_payload["name"] = alias r = requests.post( - f'{_wapi(infoblox_params)}/record:cname', + f"{_wapi(infoblox_params)}/record:cname", json=cname_req_payload, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" assert r.json().startswith("record:cname/") - return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), - v6=ipaddress.ip_address(ipv6_addr)) + return HostAddresses(v4=ipaddress.ip_address(ipv4_addr), v6=ipaddress.ip_address(ipv6_addr)) -def allocate_service_host(hostname='', - service_type='', - service_networks: ServiceNetworks = None, - host_addresses: HostAddresses = None, - cname_aliases=None, - extattrs={} - ) -> HostAddresses: +def allocate_service_host( + hostname="", + service_type="", + service_networks: ServiceNetworks = None, + host_addresses: HostAddresses = None, + cname_aliases=None, + extattrs={}, +) -> HostAddresses: """ Allocate host record with both IPv4 and IPv6 address, and respective DNS A and AAAA records. @@ -405,8 +355,7 @@ def allocate_service_host(hostname='', assert oss.IPAM ipam_params = oss.IPAM - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks @@ -414,9 +363,9 @@ def allocate_service_host(hostname='', domain_name = getattr(ipam_params, service_type).domain_name dns_view = getattr(ipam_params, service_type).dns_view - assert (oss_ipv4_containers and oss_ipv6_containers) \ - or (oss_ipv4_networks and oss_ipv6_networks), \ - "This service is missing either containers or networks configuration." + assert (oss_ipv4_containers and oss_ipv6_containers) or ( + oss_ipv4_networks and oss_ipv6_networks + ), "This service is missing either containers or networks configuration." assert domain_name, "This service is missing domain_name configuration." assert dns_view, "This service is missing dns_view configuration." @@ -429,16 +378,12 @@ def allocate_service_host(hostname='', # Use them to allocate new networks that can allocate the hosts. # IPv4 - ipv4_network = str(allocate_service_ipv4_network( - service_type=service_type).v4) - assert ipv4_network, \ - "No available space for IPv4 networks for this service type." + ipv4_network = str(allocate_service_ipv4_network(service_type=service_type).v4) + assert ipv4_network, "No available space for IPv4 networks for this service type." # IPv6 - ipv6_network = str(allocate_service_ipv6_network( - service_type=service_type).v6) - assert ipv6_network, \ - "No available space for IPv6 networks for this service type." + ipv6_network = str(allocate_service_ipv6_network(service_type=service_type).v6) + assert ipv6_network, "No available space for IPv6 networks for this service type." elif oss_ipv4_networks and oss_ipv6_networks: # This service has configured networks. @@ -450,81 +395,76 @@ def allocate_service_host(hostname='', ipv6_network_index = 0 while True: network_tuple = (ipv4_network, ipv6_network) - host = _allocate_host(hostname=hostname+domain_name, - networks=network_tuple, - cname_aliases=cname_aliases, - dns_view=dns_view, - extattrs=extattrs) + host = _allocate_host( + hostname=hostname + domain_name, + networks=network_tuple, + cname_aliases=cname_aliases, + dns_view=dns_view, + extattrs=extattrs, + ) if "NETWORK_FULL" not in host and "NETWORK_NOT_FOUND" not in host: break elif "IPV4" in host: ipv4_network_index += 1 - assert oss_ipv4_networks, \ - "No available space in any IPv4 network for this service." - assert ipv4_network_index < len(oss_ipv4_networks), \ - "No available space in any IPv4 network for this service." + assert oss_ipv4_networks, "No available space in any IPv4 network for this service." + assert ipv4_network_index < len( + oss_ipv4_networks + ), "No available space in any IPv4 network for this service." ipv4_network = str(oss_ipv4_networks[ipv4_network_index]) else: # "IPV6" in host ipv6_network_index += 1 - assert oss_ipv6_networks, \ - "No available space in any IPv6 network for this service." - assert ipv6_network_index < len(oss_ipv6_networks), \ - "No available space in any IPv6 network for this service." + assert oss_ipv6_networks, "No available space in any IPv6 network for this service." + assert ipv6_network_index < len( + oss_ipv6_networks + ), "No available space in any IPv6 network for this service." ipv6_network = str(oss_ipv6_networks[ipv6_network_index]) elif service_networks: # IPv4 ipv4_network = service_networks.v4 if oss_ipv4_containers: - assert any(ipv4_network.subnet_of(oss_ipv4_container) - for oss_ipv4_container in oss_ipv4_containers) + assert any(ipv4_network.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers) else: assert ipv4_network in oss_ipv4_networks # IPv6 ipv6_network = service_networks.v6 if oss_ipv6_containers: - assert any(ipv6_network.subnet_of(oss_ipv6_container) - for oss_ipv6_container in oss_ipv6_containers) + assert any(ipv6_network.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers) else: assert ipv6_network in oss_ipv6_networks host = _allocate_host( - hostname=hostname+domain_name, + hostname=hostname + domain_name, networks=(str(ipv4_network), str(ipv6_network)), cname_aliases=cname_aliases, dns_view=dns_view, - extattrs=extattrs + extattrs=extattrs, ) - assert "NETWORK_FULL" not in host, \ - "Network is full." - assert "NETWORK_NOT_FOUND" not in host, \ - "Network does not exist. Create it first." + assert "NETWORK_FULL" not in host, "Network is full." + assert "NETWORK_NOT_FOUND" not in host, "Network does not exist. Create it first." elif host_addresses: ipv4_addr = host_addresses.v4 ipv6_addr = host_addresses.v6 _assert_host_in_service( - ipv4_addr, ipv6_addr, - oss_ipv4_containers, oss_ipv6_containers, - oss_ipv4_networks, oss_ipv6_networks + ipv4_addr, ipv6_addr, oss_ipv4_containers, oss_ipv6_containers, oss_ipv4_networks, oss_ipv6_networks ) host = _allocate_host( - hostname=hostname+domain_name, + hostname=hostname + domain_name, addrs=(str(ipv4_addr), str(ipv6_addr)), cname_aliases=cname_aliases, dns_view=dns_view, - extattrs=extattrs + extattrs=extattrs, ) assert "NETWORK_FULL" not in host return host -def delete_service_network(ipnetwork=None, service_type='' - ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: +def delete_service_network(ipnetwork=None, service_type="") -> Union[V4ServiceNetwork, V6ServiceNetwork]: """ Delete IPv4 or IPv6 network by CIDR. """ @@ -534,8 +474,7 @@ def delete_service_network(ipnetwork=None, service_type='' assert ipam_params.INFOBLOX infoblox_params = ipam_params.INFOBLOX - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." network = str(ipnetwork) ip_version = _ip_network_version(network) @@ -546,43 +485,38 @@ def delete_service_network(ipnetwork=None, service_type='' oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks if oss_ipv4_containers: - assert any(ipnetwork.subnet_of(oss_ipv4_container) - for oss_ipv4_container in oss_ipv4_containers), \ - "Can't delete: network doesn't belong to service type." + assert any( + ipnetwork.subnet_of(oss_ipv4_container) for oss_ipv4_container in oss_ipv4_containers + ), "Can't delete: network doesn't belong to service type." else: - assert ipnetwork in oss_ipv4_networks, \ - "Can't delete: network doesn't belong to service type." + assert ipnetwork in oss_ipv4_networks, "Can't delete: network doesn't belong to service type." else: oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers oss_ipv6_networks = getattr(ipam_params, service_type).V6.networks if oss_ipv6_containers: - assert any(ipnetwork.subnet_of(oss_ipv6_container) - for oss_ipv6_container in oss_ipv6_containers), \ - "Can't delete: network doesn't belong to service type." + assert any( + ipnetwork.subnet_of(oss_ipv6_container) for oss_ipv6_container in oss_ipv6_containers + ), "Can't delete: network doesn't belong to service type." else: - assert ipnetwork in oss_ipv6_networks, \ - "Can't delete: network doesn't belong to service type." + assert ipnetwork in oss_ipv6_networks, "Can't delete: network doesn't belong to service type." network_info = _find_networks(network=network, ip_version=ip_version) assert len(network_info) == 1, "Network does not exist." - assert '_ref' in network_info[0] + assert "_ref" in network_info[0] r = requests.delete( f'{_wapi(infoblox_params)}/{network_info[0]["_ref"]}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" # Extract ipv4/ipv6 address from the network reference obtained in the # response r_text = r.text print(r_text) - network_address = ipaddress.ip_network( - r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) + network_address = ipaddress.ip_network(r_text.rsplit("/", 1)[0].split(":")[1].replace("%3A", ":")) if ip_version == 4: return V4ServiceNetwork(v4=ipaddress.ip_network(network_address)) else: @@ -590,10 +524,7 @@ def delete_service_network(ipnetwork=None, service_type='' def delete_service_host( - hostname='', - host_addresses: HostAddresses = None, - cname_aliases=[], - service_type='' + hostname="", host_addresses: HostAddresses = None, cname_aliases=[], service_type="" ) -> Union[V4HostAddress, V6HostAddress]: """ Delete host record and associated CNAME records. @@ -606,8 +537,7 @@ def delete_service_host( assert ipam_params.INFOBLOX infoblox_params = ipam_params.INFOBLOX - assert hasattr(ipam_params, service_type) \ - and service_type != 'INFOBLOX', "Invalid service type." + assert hasattr(ipam_params, service_type) and service_type != "INFOBLOX", "Invalid service type." oss_ipv4_containers = getattr(ipam_params, service_type).V4.containers oss_ipv6_containers = getattr(ipam_params, service_type).V6.containers oss_ipv4_networks = getattr(ipam_params, service_type).V4.networks @@ -618,67 +548,63 @@ def delete_service_host( ipv6_addr = str(host_addresses.v6) _assert_host_in_service( - host_addresses.v4, host_addresses.v6, - oss_ipv4_containers, oss_ipv6_containers, - oss_ipv4_networks, oss_ipv6_networks + host_addresses.v4, + host_addresses.v6, + oss_ipv4_containers, + oss_ipv6_containers, + oss_ipv4_networks, + oss_ipv6_networks, ) # Find host record reference r = requests.get( - f'{_wapi(infoblox_params)}/record:host', + f"{_wapi(infoblox_params)}/record:host", params={ - 'name': (hostname+domain_name).lower(), # hostnames are lowercase - 'ipv4addr': ipv4_addr, - 'ipv6addr': ipv6_addr, - 'view': dns_view, + "name": (hostname + domain_name).lower(), # hostnames are lowercase + "ipv4addr": ipv4_addr, + "ipv6addr": ipv6_addr, + "view": dns_view, }, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) host_data = r.json() assert len(host_data) == 1, "Host does not exist." - assert '_ref' in host_data[0] - host_ref = host_data[0]['_ref'] + assert "_ref" in host_data[0] + host_ref = host_data[0]["_ref"] # Find cname records reference r = requests.get( - f'{_wapi(infoblox_params)}/record:cname', + f"{_wapi(infoblox_params)}/record:cname", params={ - 'canonical': hostname+domain_name, + "canonical": hostname + domain_name, "view": dns_view, }, - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) cname_data = r.json() provided_cnames = [item + domain_name for item in cname_aliases] - found_cnames = [item['name'] for item in cname_data if 'name' in item] - assert provided_cnames == found_cnames, \ - "Provided CNAME alias names don't match the ones poiting to hostname." + found_cnames = [item["name"] for item in cname_data if "name" in item] + assert provided_cnames == found_cnames, "Provided CNAME alias names don't match the ones poiting to hostname." # Delete the host record r = requests.delete( - f'{_wapi(infoblox_params)}/{host_ref}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + f"{_wapi(infoblox_params)}/{host_ref}", + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" # Delete the CNAME records - cname_refs = [item['_ref'] for item in cname_data if 'name' in item] + cname_refs = [item["_ref"] for item in cname_data if "name" in item] for cname_ref in cname_refs: r = requests.delete( - f'{_wapi(infoblox_params)}/{cname_ref}', - auth=HTTPBasicAuth(infoblox_params.username, - infoblox_params.password), - verify=False + f"{_wapi(infoblox_params)}/{cname_ref}", + auth=HTTPBasicAuth(infoblox_params.username, infoblox_params.password), + verify=False, ) - assert r.status_code >= 200 and r.status_code < 300, \ - f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" + assert r.status_code >= 200 and r.status_code < 300, f"HTTP error {r.status_code}: {r.reason}\n\n{r.text}" return host_addresses diff --git a/gso/services/ipam.py b/gso/services/ipam.py index c438ed7c919a8f35d8e7f47692fca30b7aa89349..0a1b0655cff1d9a28b4ccb628f9fe19bde03fcf4 100644 --- a/gso/services/ipam.py +++ b/gso/services/ipam.py @@ -32,51 +32,43 @@ class HostAddresses(BaseSettings): v6: ipaddress.IPv6Address -def new_service_networks(service_type='', - comment="", - extattrs={}) -> ServiceNetworks: +def new_service_networks(service_type="", comment="", extattrs={}) -> ServiceNetworks: v4_service_network = _ipam.allocate_service_ipv4_network( - service_type=service_type, comment=comment, extattrs=extattrs) + service_type=service_type, comment=comment, extattrs=extattrs + ) v6_service_network = _ipam.allocate_service_ipv6_network( - service_type=service_type, comment=comment, extattrs=extattrs) - return ServiceNetworks( - v4=v4_service_network.v4, - v6=v6_service_network.v6) - - -def new_service_host(hostname, - service_type='', - service_networks: ServiceNetworks = None, - host_addresses: HostAddresses = None, - cname_aliases=None, - extattrs={}) -> HostAddresses: + service_type=service_type, comment=comment, extattrs=extattrs + ) + return ServiceNetworks(v4=v4_service_network.v4, v6=v6_service_network.v6) + + +def new_service_host( + hostname, + service_type="", + service_networks: ServiceNetworks = None, + host_addresses: HostAddresses = None, + cname_aliases=None, + extattrs={}, +) -> HostAddresses: return _ipam.allocate_service_host( hostname=hostname, service_type=service_type, service_networks=service_networks, host_addresses=host_addresses, cname_aliases=cname_aliases, - extattrs=extattrs) + extattrs=extattrs, + ) def delete_service_network( - network: ipaddress.ip_network = None, service_type='' + network: ipaddress.ip_network = None, service_type="" ) -> Union[V4ServiceNetwork, V6ServiceNetwork]: - return _ipam.delete_service_network( - ipnetwork=network, - service_type=service_type - ) + return _ipam.delete_service_network(ipnetwork=network, service_type=service_type) def delete_service_host( - hostname='', - host_addresses: HostAddresses = None, - cname_aliases=[], - service_type='' + hostname="", host_addresses: HostAddresses = None, cname_aliases=[], service_type="" ) -> HostAddresses: return _ipam.delete_service_host( - hostname=hostname, - host_addresses=host_addresses, - cname_aliases=cname_aliases, - service_type=service_type + hostname=hostname, host_addresses=host_addresses, cname_aliases=cname_aliases, service_type=service_type ) diff --git a/gso/services/provisioning_proxy.py b/gso/services/provisioning_proxy.py index 2fe01ffcf0476184fadf73dbe7feae0f4c2b1108..0f61c54c3a29f9f63cbd70326b45c80b5ce7b2d9 100644 --- a/gso/services/provisioning_proxy.py +++ b/gso/services/provisioning_proxy.py @@ -29,16 +29,16 @@ class CUDOperation(strEnum): Enum for different C(R)UD operations that the provisioning proxy supports. Read is not applicable, hence these become CUD and not CRUD operations. """ + #: Creation is done with a POST request - POST = 'POST' + POST = "POST" #: Updating is done with a PUT request - PUT = 'PUT' + PUT = "PUT" #: Removal is done with a DELETE request - DELETE = 'DELETE' + DELETE = "DELETE" -def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, - operation: CUDOperation): +def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, operation: CUDOperation): """ Internal function for sending a request to LSO. The callback address is derived using the process ID provided. @@ -57,13 +57,11 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, pp_params = oss.PROVISIONING_PROXY assert pp_params - callback_url = f'{settings.load_oss_params().GENERAL.public_hostname}' \ - f'/api/processes/{process_id}/resume' - logger.debug('[provisioning proxy] provisioning for process %s', - process_id) + callback_url = f"{settings.load_oss_params().GENERAL.public_hostname}" f"/api/processes/{process_id}/resume" + logger.debug("[provisioning proxy] provisioning for process %s", process_id) - parameters.update({'callback': callback_url}) - url = f'{pp_params.scheme}://{pp_params.api_base}/api/{endpoint}' + parameters.update({"callback": callback_url}) + url = f"{pp_params.scheme}://{pp_params.api_base}/api/{endpoint}" request = None @@ -79,10 +77,7 @@ def _send_request(endpoint: str, parameters: dict, process_id: UUIDstr, raise AssertionError(request.content) -def provision_device( - subscription: DeviceProvisioning, - process_id: UUIDstr, - dry_run: bool = True): +def provision_device(subscription: DeviceProvisioning, process_id: UUIDstr, dry_run: bool = True): """ Function that provisions a new device using LSO. @@ -92,18 +87,14 @@ def provision_device( :param bool dry_run: A boolean indicating whether this should be a dry run or not, defaults to ``True``. """ - parameters = { - 'dry_run': dry_run, - 'subscription': json.loads(json_dumps(subscription)) - } + parameters = {"dry_run": dry_run, "subscription": json.loads(json_dumps(subscription))} - _send_request('device', parameters, process_id, CUDOperation.POST) + _send_request("device", parameters, process_id, CUDOperation.POST) -def provision_ip_trunk(subscription: IptrunkProvisioning, - process_id: UUIDstr, - config_object: str, - dry_run: bool = True): +def provision_ip_trunk( + subscription: IptrunkProvisioning, process_id: UUIDstr, config_object: str, dry_run: bool = True +): """ Function that provisions an IP trunk service using LSO. @@ -115,13 +106,13 @@ def provision_ip_trunk(subscription: IptrunkProvisioning, or not, defaults to ``True``. """ parameters = { - 'subscription': json.loads(json_dumps(subscription)), - 'dry_run': dry_run, - 'verb': 'deploy', - 'object': config_object + "subscription": json.loads(json_dumps(subscription)), + "dry_run": dry_run, + "verb": "deploy", + "object": config_object, } - _send_request('ip_trunk', parameters, process_id, CUDOperation.POST) + _send_request("ip_trunk", parameters, process_id, CUDOperation.POST) # def modify_ip_trunk(old_subscription: Iptrunk, @@ -149,9 +140,7 @@ def provision_ip_trunk(subscription: IptrunkProvisioning, # _send_request('ip_trunk', parameters, process_id, CUDOperation.PUT) -def deprovision_ip_trunk(subscription: Iptrunk, - process_id: UUIDstr, - dry_run: bool = True): +def deprovision_ip_trunk(subscription: Iptrunk, process_id: UUIDstr, dry_run: bool = True): """ Function that provisions an IP trunk service using LSO. @@ -161,31 +150,25 @@ def deprovision_ip_trunk(subscription: Iptrunk, :param bool dry_run: A boolean indicating whether this should be a dry run or not, defaults to ``True``. """ - parameters = { - 'subscription': json.loads(json_dumps(subscription)), - 'dry_run': dry_run, - 'verb': 'terminate' - } + parameters = {"subscription": json.loads(json_dumps(subscription)), "dry_run": dry_run, "verb": "terminate"} - _send_request('ip_trunk', parameters, process_id, CUDOperation.DELETE) + _send_request("ip_trunk", parameters, process_id, CUDOperation.DELETE) -@inputstep('Await provisioning proxy results', assignee=Assignee('SYSTEM')) -def await_pp_results(subscription: SubscriptionModel, - label_text: str) -> State: +@inputstep("Await provisioning proxy results", assignee=Assignee("SYSTEM")) +def await_pp_results(subscription: SubscriptionModel, label_text: str) -> State: class ProvisioningResultPage(FormPage): class Config: - title = f'Deploying {subscription.product.name}...' + title = f"Deploying {subscription.product.name}..." warning_label: Label = label_text pp_run_results: dict = None - confirm: Accept = Accept('INCOMPLETE') + confirm: Accept = Accept("INCOMPLETE") - @validator('pp_run_results', allow_reuse=True, pre=True, always=True) + @validator("pp_run_results", allow_reuse=True, pre=True, always=True) def run_results_must_be_given(cls, run_results): if run_results is None: - raise ValueError('Run results may not be empty. ' - 'Wait for the provisioning proxy to finish.') + raise ValueError("Run results may not be empty. " "Wait for the provisioning proxy to finish.") return run_results result_page = yield ProvisioningResultPage @@ -193,18 +176,19 @@ def await_pp_results(subscription: SubscriptionModel, return result_page.dict() -@inputstep('Confirm provisioning proxy results', assignee=Assignee('SYSTEM')) +@inputstep("Confirm provisioning proxy results", assignee=Assignee("SYSTEM")) def confirm_pp_results(state: State) -> State: class ConfirmRunPage(FormPage): class Config: - title = f"Execution for " \ - f"{state['subscription']['product']['name']} " \ - f"completed, please confirm the results below." - - run_status: str = ReadOnlyField(state['pp_run_results']['status']) - run_results: LongText = ReadOnlyField( - f"{state['pp_run_results']['output']}") - confirm: Accept = Accept('INCOMPLETE') + title = ( + f"Execution for " + f"{state['subscription']['product']['name']} " + f"completed, please confirm the results below." + ) + + run_status: str = ReadOnlyField(state["pp_run_results"]["status"]) + run_results: LongText = ReadOnlyField(f"{state['pp_run_results']['output']}") + confirm: Accept = Accept("INCOMPLETE") yield ConfirmRunPage diff --git a/gso/services/resource_manager.py b/gso/services/resource_manager.py index 13106d8a91a7c96e73ac6309c1557f9f4b005c74..f13d80c40f23c0efa8b148ee8c74507016fc7058 100644 --- a/gso/services/resource_manager.py +++ b/gso/services/resource_manager.py @@ -4,26 +4,22 @@ from gso import settings def import_new_router(router_name, oss_params=settings.OSSParams): - r = requests.post( - f'{oss_params.RESOURCE_MANAGER_API_PREFIX}' - f'/api/interfaces/initialize-router/{router_name}') + r = requests.post(f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/initialize-router/{router_name}") r.raise_for_status() def next_lag(router_name, oss_params=settings.OSSParams): - r = requests.post( - f'{oss_params.RESOURCE_MANAGER_API_PREFIX}' - f'/api/interfaces/next-lag/{router_name}') + r = requests.post(f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/next-lag/{router_name}") r.raise_for_status() response = r.json() - return response['name'] + return response["name"] def next_physical(router_name, lag_name, oss_params=settings.OSSParams): # TODO: speed needed (if first interface) r = requests.post( - f'{oss_params.RESOURCE_MANAGER_API_PREFIX}' - f'/api/interfaces/next-physical/{router_name}/{lag_name}') + f"{oss_params.RESOURCE_MANAGER_API_PREFIX}" f"/api/interfaces/next-physical/{router_name}/{lag_name}" + ) r.raise_for_status() response = r.json() - return response['name'] + return response["name"] diff --git a/gso/settings.py b/gso/settings.py index dd81fe6460030fcf0c64cda4c6eec10ba386757f..dc2b2e832e95a85f1eadd10ca3a3aaf7dfa49b83 100644 --- a/gso/settings.py +++ b/gso/settings.py @@ -13,6 +13,7 @@ class GeneralParams(BaseSettings): """ General parameters for a GSO configuration file. """ + #: The hostname that GSO is publicly served at, used for building the #: callback URL that the provisioning proxy uses. public_hostname: str @@ -22,6 +23,7 @@ class InfoBloxParams(BaseSettings): """ Parameters related to InfoBlox. """ + scheme: str wapi_version: str host: str @@ -33,6 +35,7 @@ class V4NetworkParams(BaseSettings): """ A set of parameters that describe an IPv4 network in InfoBlox. """ + containers: list[ipaddress.IPv4Network] networks: list[ipaddress.IPv4Network] mask: int # TODO: validation on mask? @@ -42,6 +45,7 @@ class V6NetworkParams(BaseSettings): """ A set of parameters that describe an IPv6 network in InfoBlox. """ + containers: list[ipaddress.IPv6Network] networks: list[ipaddress.IPv6Network] mask: int # TODO: validation on mask? @@ -52,6 +56,7 @@ class ServiceNetworkParams(BaseSettings): Parameters for InfoBlox that describe IPv4 and v6 networks, and the corresponding domain name that should be used as a suffix. """ + V4: V4NetworkParams V6: V6NetworkParams domain_name: str @@ -62,6 +67,7 @@ class IPAMParams(BaseSettings): """ A set of parameters related to IPAM. """ + INFOBLOX: InfoBloxParams LO: ServiceNetworkParams TRUNK: ServiceNetworkParams @@ -74,6 +80,7 @@ class ProvisioningProxyParams(BaseSettings): """ Parameters for the provisioning proxy. """ + scheme: str api_base: str auth: str # FIXME: unfinished @@ -84,6 +91,7 @@ class OSSParams(BaseSettings): """ The set of parameters required for running GSO. """ + GENERAL: GeneralParams IPAM: IPAMParams RESOURCE_MANAGER_API_PREFIX: str @@ -95,9 +103,9 @@ def load_oss_params() -> OSSParams: look for OSS_PARAMS_FILENAME in the environment and load the parameters from that file. """ - with open(os.environ['OSS_PARAMS_FILENAME'], encoding='utf-8') as file: + with open(os.environ["OSS_PARAMS_FILENAME"], encoding="utf-8") as file: return OSSParams(**json.loads(file.read())) -if __name__ == '__main__': +if __name__ == "__main__": print(load_oss_params()) diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index 61f560b81b02bf2e5c4ed7df0d89a92ff4171c86..aa7685d8607d855edfd20f16f3810e7950435c4b 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -4,16 +4,11 @@ init class that imports all workflows into GSO. from orchestrator.workflows import LazyWorkflowInstance LazyWorkflowInstance("gso.workflows.device.create_device", "create_device") -LazyWorkflowInstance( - "gso.workflows.device.terminate_device", "terminate_device" -) +LazyWorkflowInstance("gso.workflows.device.terminate_device", "terminate_device") LazyWorkflowInstance("gso.workflows.device.get_facts", "get_facts") LazyWorkflowInstance("gso.workflows.iptrunk.create_iptrunk", "create_iptrunk") LazyWorkflowInstance("gso.workflows.iptrunk.modify_generic", "modify_generic") -LazyWorkflowInstance("gso.workflows.iptrunk.terminate_iptrunk", - "terminate_iptrunk") -LazyWorkflowInstance("gso.workflows.iptrunk.modify_isis_metric", - "modify_isis_metric") -LazyWorkflowInstance("gso.workflows.iptrunk.modify_generic", - "modify_generic") +LazyWorkflowInstance("gso.workflows.iptrunk.terminate_iptrunk", "terminate_iptrunk") +LazyWorkflowInstance("gso.workflows.iptrunk.modify_isis_metric", "modify_isis_metric") +LazyWorkflowInstance("gso.workflows.iptrunk.modify_generic", "modify_generic") LazyWorkflowInstance("gso.workflows.site.create_site", "create_site") diff --git a/gso/workflows/device/create_device.py b/gso/workflows/device/create_device.py index dba6510dd4ab4bb9b97c65321bdab5409a83fd3f..70381f6b2abfa06c95298d673ae072097b1db88e 100644 --- a/gso/workflows/device/create_device.py +++ b/gso/workflows/device/create_device.py @@ -29,9 +29,7 @@ def site_selector() -> Choice: ProductTable.product_type == "Site", SubscriptionTable.status == "active", ) - .with_entities( - SubscriptionTable.subscription_id, SubscriptionTable.description - ) + .with_entities(SubscriptionTable.subscription_id, SubscriptionTable.description) .all() ): site_subscriptions[str(site_id)] = site_description @@ -82,48 +80,36 @@ def iso_from_ipv4(ipv4_address): def get_info_from_ipam(subscription: DeviceProvisioning) -> State: lo0_alias = re.sub(".geant.net", "", subscription.device.device_fqdn) lo0_name = f"lo0.{lo0_alias}" - lo0_addr = _ipam.allocate_service_host( - hostname=lo0_name, service_type="LO", cname_aliases=[lo0_alias] - ) + lo0_addr = _ipam.allocate_service_host(hostname=lo0_name, service_type="LO", cname_aliases=[lo0_alias]) subscription.device.device_lo_ipv4_address = lo0_addr.v4 subscription.device.device_lo_ipv6_address = lo0_addr.v6 - subscription.device.device_lo_iso_address = iso_from_ipv4( - str(subscription.device.device_lo_ipv4_address) - ) - subscription.device.device_si_ipv4_network = ( - _ipam.allocate_service_ipv4_network( - service_type="SI", comment=f"SI for {lo0_name}" - ).v4 - ) - subscription.device.device_ias_lt_ipv4_network = ( - _ipam.allocate_service_ipv4_network( - service_type="LT_IAS", comment=f"LT for {lo0_name}" - ).v4 - ) - subscription.device.device_ias_lt_ipv6_network = ( - _ipam.allocate_service_ipv6_network( - service_type="LT_IAS", comment=f"LT for {lo0_name}" - ).v6 - ) + subscription.device.device_lo_iso_address = iso_from_ipv4(str(subscription.device.device_lo_ipv4_address)) + subscription.device.device_si_ipv4_network = _ipam.allocate_service_ipv4_network( + service_type="SI", comment=f"SI for {lo0_name}" + ).v4 + subscription.device.device_ias_lt_ipv4_network = _ipam.allocate_service_ipv4_network( + service_type="LT_IAS", comment=f"LT for {lo0_name}" + ).v4 + subscription.device.device_ias_lt_ipv6_network = _ipam.allocate_service_ipv6_network( + service_type="LT_IAS", comment=f"LT for {lo0_name}" + ).v6 return {"subscription": subscription} @step("Initialize subscription") def initialize_subscription( - subscription: device.DeviceInactive, - hostname: str, - ts_address: ipaddress.IPv4Address, - ts_port: str, - device_vendor: device_pb.DeviceVendor, - device_site: str, - device_role: device_pb.DeviceRole, + subscription: device.DeviceInactive, + hostname: str, + ts_address: ipaddress.IPv4Address, + ts_port: str, + device_vendor: device_pb.DeviceVendor, + device_site: str, + device_role: device_pb.DeviceRole, ) -> State: subscription.device.device_ts_address = str(ts_address) subscription.device.device_ts_port = str(ts_port) subscription.device.device_vendor = device_vendor - subscription.device.device_site = Site.from_subscription( - device_site - ).site + subscription.device.device_site = Site.from_subscription(device_site).site fqdn = ( f"{hostname}.{subscription.device.device_site.site_name.lower()}." f"{subscription.device.device_site.site_country_code.lower()}" @@ -134,17 +120,13 @@ def initialize_subscription( subscription.device.device_access_via_ts = True subscription.description = f"Device {fqdn} ({subscription.device_type})" - subscription = device.DeviceProvisioning.from_other_lifecycle( - subscription, SubscriptionLifecycle.PROVISIONING - ) + subscription = device.DeviceProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING) return {"subscription": subscription} @step("Provision device [DRY RUN]") -def provision_device_dry( - subscription: DeviceProvisioning, process_id: UUIDstr -) -> State: +def provision_device_dry(subscription: DeviceProvisioning, process_id: UUIDstr) -> State: provisioning_proxy.provision_device(subscription, process_id) return { @@ -160,9 +142,7 @@ def provision_device_dry( @step("Provision device [FOR REAL]") -def provision_device_real( - subscription: DeviceProvisioning, process_id: UUIDstr -) -> State: +def provision_device_real(subscription: DeviceProvisioning, process_id: UUIDstr) -> State: provisioning_proxy.provision_device(subscription, process_id, False) return { @@ -179,25 +159,23 @@ def provision_device_real( @workflow( "Create device", - initial_input_form=wrap_create_initial_input_form( - initial_input_form_generator - ), + initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), target=Target.CREATE, ) def create_device(): return ( - init - >> create_subscription - >> store_process_subscription(Target.CREATE) - >> initialize_subscription - >> get_info_from_ipam - >> provision_device_dry - >> await_pp_results - >> confirm_pp_results - >> provision_device_real - >> await_pp_results - >> confirm_pp_results - >> set_status(SubscriptionLifecycle.ACTIVE) - >> resync - >> done + init + >> create_subscription + >> store_process_subscription(Target.CREATE) + >> initialize_subscription + >> get_info_from_ipam + >> provision_device_dry + >> await_pp_results + >> confirm_pp_results + >> provision_device_real + >> await_pp_results + >> confirm_pp_results + >> set_status(SubscriptionLifecycle.ACTIVE) + >> resync + >> done ) diff --git a/gso/workflows/device/get_facts.py b/gso/workflows/device/get_facts.py index a50caee3a8fe3eba6811107615d1eca0f2ea58c4..ada97fe114045a173ba0d08b8a2e34d29e39a082 100644 --- a/gso/workflows/device/get_facts.py +++ b/gso/workflows/device/get_facts.py @@ -17,9 +17,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.device import Device -def initial_input_form_generator( - subscription_id: UUIDstr, organisation: UUIDstr -) -> InputForm: +def initial_input_form_generator(subscription_id: UUIDstr, organisation: UUIDstr) -> InputForm: subscription = Device.from_subscription(subscription_id) class TerminateForm(FormPage): @@ -47,9 +45,7 @@ def get_facts(subscription_id) -> None: @workflow( "Get Facts from Device", - initial_input_form=wrap_modify_initial_input_form( - initial_input_form_generator - ), + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), target=Target.SYSTEM, ) def get_facts_from_device(): diff --git a/gso/workflows/device/terminate_device.py b/gso/workflows/device/terminate_device.py index cd56cd1477dfce1a719afcf8657e4619855cbf2b..ed2e7bcde19095125391c09ee2c2c141b2d5271f 100644 --- a/gso/workflows/device/terminate_device.py +++ b/gso/workflows/device/terminate_device.py @@ -16,9 +16,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> InputForm: subscription = Device.from_subscription(subscription_id) class TerminateForm(FormPage): - are_you_sure: Label = ( - f"Are you sure you want to remove {subscription.description}?" - ) + are_you_sure: Label = f"Are you sure you want to remove {subscription.description}?" return TerminateForm @@ -34,13 +32,7 @@ def deprovision_loopback_ips(subscription: Device) -> None: v6=ipaddress.ip_address(subscription.device.device_lo_ipv6_address), ) fqdn_as_list = subscription.device.device_fqdn.split(".") - hostname = ( - str(fqdn_as_list[0]) - + "." - + str(fqdn_as_list[1]) - + "." - + str(fqdn_as_list[2]) - ) + hostname = str(fqdn_as_list[0]) + "." + str(fqdn_as_list[1]) + "." + str(fqdn_as_list[2]) lo0_name = "lo0." + hostname host_addresses = ipam.delete_service_host( hostname=lo0_name, @@ -54,9 +46,7 @@ def deprovision_loopback_ips(subscription: Device) -> None: @step("Deprovision SI- interface IPs from IPAM/DNS") def deprovision_si_ips(subscription: Device) -> None: service_network = ipam.delete_service_network( - network=ipaddress.ip_network( - subscription.device.device_si_ipv4_network - ), + network=ipaddress.ip_network(subscription.device.device_si_ipv4_network), service_type="SI", ) return {"service_network": service_network} @@ -65,15 +55,11 @@ def deprovision_si_ips(subscription: Device) -> None: @step("Deprovision LT- interface (IAS) IPs from IPAM/DNS") def deprovision_lt_ips(subscription: Device) -> None: service_network_v4 = ipam.delete_service_network( - network=ipaddress.ip_network( - subscription.device.device_ias_lt_ipv4_network - ), + network=ipaddress.ip_network(subscription.device.device_ias_lt_ipv4_network), service_type="LT_IAS", ) service_network_v6 = ipam.delete_service_network( - network=ipaddress.ip_network( - subscription.device.device_ias_lt_ipv6_network - ), + network=ipaddress.ip_network(subscription.device.device_ias_lt_ipv6_network), service_type="LT_IAS", ) return { @@ -84,9 +70,7 @@ def deprovision_lt_ips(subscription: Device) -> None: @workflow( "Terminate device", - initial_input_form=wrap_modify_initial_input_form( - initial_input_form_generator - ), + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), target=Target.TERMINATE, ) def terminate_device(): diff --git a/gso/workflows/iptrunk/create_iptrunk.py b/gso/workflows/iptrunk/create_iptrunk.py index 127caf0ade9912dcceb5c0da54e31d7bd20cb11c..46fcbb310f8c88f31329face548432f09f578a67 100644 --- a/gso/workflows/iptrunk/create_iptrunk.py +++ b/gso/workflows/iptrunk/create_iptrunk.py @@ -30,9 +30,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: ProductTable.product_type == "Device", SubscriptionTable.status == "active", ) - .with_entities( - SubscriptionTable.subscription_id, SubscriptionTable.description - ) + .with_entities(SubscriptionTable.subscription_id, SubscriptionTable.description) .all() ): devices[str(device_id)] = device_description @@ -52,8 +50,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: class AeMembersListA(UniqueConstrainedList[str]): min_items = initial_user_input.iptrunk_minimum_links - DeviceEnumA = Choice("Select a device", zip(devices.keys(), - devices.items())) + DeviceEnumA = Choice("Select a device", zip(devices.keys(), devices.items())) class CreateIptrunkSideAForm(FormPage): class Config: @@ -69,8 +66,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: # We remove the selected device for side A, to prevent any loops devices.pop(str(user_input_side_a.iptrunk_sideA_node_id.name)) - DeviceEnumB = Choice("Select a device", zip(devices.keys(), - devices.items())) + DeviceEnumB = Choice("Select a device", zip(devices.keys(), devices.items())) class AeMembersListB(UniqueConstrainedList[str]): min_items = len(user_input_side_a.iptrunk_sideA_ae_members) @@ -88,11 +84,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator: user_input_side_b = yield CreateIptrunkSideBForm - return ( - initial_user_input.dict() - | user_input_side_a.dict() - | user_input_side_b.dict() - ) + return initial_user_input.dict() | user_input_side_a.dict() | user_input_side_b.dict() @step("Create subscription") @@ -108,18 +100,14 @@ def create_subscription(product: UUIDstr) -> State: @step("Get information from IPAM") def get_info_from_ipam(subscription: IptrunkProvisioning) -> State: # TODO: get info about how these should be generated - subscription.iptrunk.iptrunk_ipv4_network = ( - _ipam.allocate_service_ipv4_network( - service_type="TRUNK", - comment=subscription.iptrunk.iptrunk_description, - ).v4 - ) - subscription.iptrunk.iptrunk_ipv6_network = ( - _ipam.allocate_service_ipv6_network( - service_type="TRUNK", - comment=subscription.iptrunk.iptrunk_description, - ).v6 - ) + subscription.iptrunk.iptrunk_ipv4_network = _ipam.allocate_service_ipv4_network( + service_type="TRUNK", + comment=subscription.iptrunk.iptrunk_description, + ).v4 + subscription.iptrunk.iptrunk_ipv6_network = _ipam.allocate_service_ipv6_network( + service_type="TRUNK", + comment=subscription.iptrunk.iptrunk_description, + ).v6 return {"subscription": subscription} @@ -149,179 +137,107 @@ def initialize_subscription( subscription.iptrunk.iptrunk_isis_metric = 9000 subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links - subscription.iptrunk.iptrunk_sideA_node = Device.from_subscription( - iptrunk_sideA_node_id - ).device + subscription.iptrunk.iptrunk_sideA_node = Device.from_subscription(iptrunk_sideA_node_id).device subscription.iptrunk.iptrunk_sideA_ae_iface = iptrunk_sideA_ae_iface - subscription.iptrunk.iptrunk_sideA_ae_geant_a_sid = ( - iptrunk_sideA_ae_geant_a_sid - ) + subscription.iptrunk.iptrunk_sideA_ae_geant_a_sid = iptrunk_sideA_ae_geant_a_sid subscription.iptrunk.iptrunk_sideA_ae_members = iptrunk_sideA_ae_members - subscription.iptrunk.iptrunk_sideA_ae_members_description = ( - iptrunk_sideA_ae_members_descriptions - ) + subscription.iptrunk.iptrunk_sideA_ae_members_description = iptrunk_sideA_ae_members_descriptions - subscription.iptrunk.iptrunk_sideB_node = Device.from_subscription( - iptrunk_sideB_node_id - ).device + subscription.iptrunk.iptrunk_sideB_node = Device.from_subscription(iptrunk_sideB_node_id).device subscription.iptrunk.iptrunk_sideB_ae_iface = iptrunk_sideB_ae_iface - subscription.iptrunk.iptrunk_sideB_ae_geant_a_sid = ( - iptrunk_sideB_ae_geant_a_sid - ) + subscription.iptrunk.iptrunk_sideB_ae_geant_a_sid = iptrunk_sideB_ae_geant_a_sid subscription.iptrunk.iptrunk_sideB_ae_members = iptrunk_sideB_ae_members - subscription.iptrunk.iptrunk_sideB_ae_members_description = ( - iptrunk_sideB_ae_members_descriptions - ) + subscription.iptrunk.iptrunk_sideB_ae_members_description = iptrunk_sideB_ae_members_descriptions subscription.description = f"IP trunk, geant_s_sid:{geant_s_sid}" - subscription = IptrunkProvisioning.from_other_lifecycle( - subscription, SubscriptionLifecycle.PROVISIONING - ) + subscription = IptrunkProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING) return {"subscription": subscription} @step("Provision IP trunk interface [DRY RUN]") -def provision_ip_trunk_iface_dry( - subscription: IptrunkProvisioning, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "trunk_interface" - ) +def provision_ip_trunk_iface_dry(subscription: IptrunkProvisioning, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "trunk_interface") return { "subscription": subscription, - "label_text": ( - "Provision of the Trunk interface [DRY] " - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the Trunk interface [DRY] " "Please refresh to get the results of the playbook"), } @step("Provision IP trunk interface [FOR REAL]") -def provision_ip_trunk_iface_real( - subscription: IptrunkProvisioning, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "trunk_interface", False - ) +def provision_ip_trunk_iface_real(subscription: IptrunkProvisioning, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "trunk_interface", False) return { "subscription": subscription, - "label_text": ( - "Provision of the Trunk interface [REAL] " - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the Trunk interface [REAL] " "Please refresh to get the results of the playbook"), } @step("Provision IP trunk ISIS interface [DRY RUN]") -def provision_ip_trunk_isis_iface_dry( - subscription: IptrunkProvisioning, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "isis_interface" - ) +def provision_ip_trunk_isis_iface_dry(subscription: IptrunkProvisioning, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "isis_interface") return { "subscription": subscription, - "label_text": ( - "Provision of the ISIS interface [DRY]" - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the ISIS interface [DRY]" "Please refresh to get the results of the playbook"), } @step("Provision IP trunk ISIS interface [FOR REAL]") -def provision_ip_trunk_isis_iface_real( - subscription: IptrunkProvisioning, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "isis_interface", False - ) +def provision_ip_trunk_isis_iface_real(subscription: IptrunkProvisioning, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "isis_interface", False) return { "subscription": subscription, - "label_text": ( - "Provision of the ISIS interface [REAL]" - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the ISIS interface [REAL]" "Please refresh to get the results of the playbook"), } @step("Provision IP trunk LDP interface [DRY RUN]") -def provision_ip_trunk_ldp_iface_dry( - subscription: IptrunkProvisioning, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "ldp_interface" - ) +def provision_ip_trunk_ldp_iface_dry(subscription: IptrunkProvisioning, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "ldp_interface") return { "subscription": subscription, - "label_text": ( - "Provision of the LDP interface [DRY]" - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the LDP interface [DRY]" "Please refresh to get the results of the playbook"), } @step("Provision IP trunk LDP interface [FOR REAL]") -def provision_ip_trunk_ldp_iface_real( - subscription: IptrunkProvisioning, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "ldp_interface", False - ) +def provision_ip_trunk_ldp_iface_real(subscription: IptrunkProvisioning, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "ldp_interface", False) return { "subscription": subscription, - "label_text": ( - "Provision of the LDP interface [REAL]" - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the LDP interface [REAL]" "Please refresh to get the results of the playbook"), } @step("Provision IP trunk LLDP interface [DRY RUN]") -def provision_ip_trunk_lldp_iface_dry( - subscription: IptrunkProvisioning, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "lldp_interface" - ) +def provision_ip_trunk_lldp_iface_dry(subscription: IptrunkProvisioning, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "lldp_interface") return { "subscription": subscription, - "label_text": ( - "Provision of the LLDP interface [DRY]" - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the LLDP interface [DRY]" "Please refresh to get the results of the playbook"), } @step("Provision IP trunk LLDP interface [FOR REAL]") -def provision_ip_trunk_lldp_iface_real( - subscription: IptrunkProvisioning, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "lldp_interface", False - ) +def provision_ip_trunk_lldp_iface_real(subscription: IptrunkProvisioning, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "lldp_interface", False) return { "subscription": subscription, - "label_text": ( - "Provision of the LLDP interface [REAL]" - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the LLDP interface [REAL]" "Please refresh to get the results of the playbook"), } @workflow( "Create IP trunk", - initial_input_form=wrap_create_initial_input_form( - initial_input_form_generator - ), + initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), target=Target.CREATE, ) def create_iptrunk(): diff --git a/gso/workflows/iptrunk/modify_generic.py b/gso/workflows/iptrunk/modify_generic.py index c4745e1f8c401e7d7e5b4ddc2d07de311db97274..92b245ae9f0efbe1888874f69eaa2abdce0366cc 100644 --- a/gso/workflows/iptrunk/modify_generic.py +++ b/gso/workflows/iptrunk/modify_generic.py @@ -24,15 +24,9 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: iptrunk_type: IptrunkType = subscription.iptrunk.iptrunk_type iptrunk_speed: PhyPortCapacity = subscription.iptrunk.iptrunk_speed iptrunk_minimum_links: int = subscription.iptrunk.iptrunk_minimum_links - iptrunk_isis_metric: int = ReadOnlyField( - subscription.iptrunk.iptrunk_isis_metric - ) - iptrunk_ipv4_network: ipaddress.IPv4Network = ReadOnlyField( - subscription.iptrunk.iptrunk_ipv4_network - ) - iptrunk_ipv6_network: ipaddress.IPv6Network = ReadOnlyField( - subscription.iptrunk.iptrunk_ipv6_network - ) + iptrunk_isis_metric: int = ReadOnlyField(subscription.iptrunk.iptrunk_isis_metric) + iptrunk_ipv4_network: ipaddress.IPv4Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv4_network) + iptrunk_ipv6_network: ipaddress.IPv6Network = ReadOnlyField(subscription.iptrunk.iptrunk_ipv6_network) initial_user_input = yield ModifyIptrunkForm @@ -43,18 +37,10 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class Config: title = "Provide subscription details for side A of the trunk." - iptrunk_sideA_node: str = ReadOnlyField( - subscription.iptrunk.iptrunk_sideA_node.device_fqdn - ) - iptrunk_sideA_ae_iface: str = ReadOnlyField( - subscription.iptrunk.iptrunk_sideA_ae_iface - ) - iptrunk_sideA_ae_geant_a_sid: str = ( - subscription.iptrunk.iptrunk_sideA_ae_geant_a_sid - ) - iptrunk_sideA_ae_members: AeMembersListA = ( - subscription.iptrunk.iptrunk_sideA_ae_members - ) + iptrunk_sideA_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sideA_node.device_fqdn) + iptrunk_sideA_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sideA_ae_iface) + iptrunk_sideA_ae_geant_a_sid: str = subscription.iptrunk.iptrunk_sideA_ae_geant_a_sid + iptrunk_sideA_ae_members: AeMembersListA = subscription.iptrunk.iptrunk_sideA_ae_members iptrunk_sideA_ae_members_descriptions: AeMembersListA = ( subscription.iptrunk.iptrunk_sideA_ae_members_description ) @@ -69,29 +55,17 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class Config: title = "Provide subscription details for side B of the trunk." - iptrunk_sideB_node: str = ReadOnlyField( - subscription.iptrunk.iptrunk_sideB_node.device_fqdn - ) - iptrunk_sideB_ae_iface: str = ReadOnlyField( - subscription.iptrunk.iptrunk_sideB_ae_iface - ) - iptrunk_sideB_ae_geant_a_sid: str = ( - subscription.iptrunk.iptrunk_sideB_ae_geant_a_sid - ) - iptrunk_sideB_ae_members: AeMembersListB = ( - subscription.iptrunk.iptrunk_sideB_ae_members - ) + iptrunk_sideB_node: str = ReadOnlyField(subscription.iptrunk.iptrunk_sideB_node.device_fqdn) + iptrunk_sideB_ae_iface: str = ReadOnlyField(subscription.iptrunk.iptrunk_sideB_ae_iface) + iptrunk_sideB_ae_geant_a_sid: str = subscription.iptrunk.iptrunk_sideB_ae_geant_a_sid + iptrunk_sideB_ae_members: AeMembersListB = subscription.iptrunk.iptrunk_sideB_ae_members iptrunk_sideB_ae_members_descriptions: AeMembersListB = ( subscription.iptrunk.iptrunk_sideB_ae_members_description ) user_input_side_b = yield ModifyIptrunkSideBForm - return ( - initial_user_input.dict() - | user_input_side_a.dict() - | user_input_side_b.dict() - ) + return initial_user_input.dict() | user_input_side_a.dict() | user_input_side_b.dict() @step("Update subscription") @@ -115,21 +89,13 @@ def modify_iptrunk_subscription( subscription.iptrunk.iptrunk_speed = iptrunk_speed subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links - subscription.iptrunk.iptrunk_sideA_ae_geant_a_sid = ( - iptrunk_sideA_ae_geant_a_sid - ) + subscription.iptrunk.iptrunk_sideA_ae_geant_a_sid = iptrunk_sideA_ae_geant_a_sid subscription.iptrunk.iptrunk_sideA_ae_members = iptrunk_sideA_ae_members - subscription.iptrunk.iptrunk_sideA_ae_members_description = ( - iptrunk_sideA_ae_members_descriptions - ) + subscription.iptrunk.iptrunk_sideA_ae_members_description = iptrunk_sideA_ae_members_descriptions - subscription.iptrunk.iptrunk_sideB_ae_geant_a_sid = ( - iptrunk_sideB_ae_geant_a_sid - ) + subscription.iptrunk.iptrunk_sideB_ae_geant_a_sid = iptrunk_sideB_ae_geant_a_sid subscription.iptrunk.iptrunk_sideB_ae_members = iptrunk_sideB_ae_members - subscription.iptrunk.iptrunk_sideB_ae_members_description = ( - iptrunk_sideB_ae_members_descriptions - ) + subscription.iptrunk.iptrunk_sideB_ae_members_description = iptrunk_sideB_ae_members_descriptions subscription.description = f"IP trunk, geant_s_sid:{geant_s_sid}" @@ -137,78 +103,48 @@ def modify_iptrunk_subscription( @step("Provision IP trunk interface [DRY RUN]") -def provision_ip_trunk_iface_dry( - subscription: Iptrunk, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "trunk_interface" - ) +def provision_ip_trunk_iface_dry(subscription: Iptrunk, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "trunk_interface") return { "subscription": subscription, - "label_text": ( - "Provision of the Trunk interface [DRY] " - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the Trunk interface [DRY] " "Please refresh to get the results of the playbook"), } @step("Provision IP trunk interface [FOR REAL]") -def provision_ip_trunk_iface_real( - subscription: Iptrunk, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "trunk_interface", False - ) +def provision_ip_trunk_iface_real(subscription: Iptrunk, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "trunk_interface", False) return { "subscription": subscription, - "label_text": ( - "Provision of the Trunk interface [REAL] " - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the Trunk interface [REAL] " "Please refresh to get the results of the playbook"), } @step("Provision IP trunk LLDP interface [DRY RUN]") -def provision_ip_trunk_lldp_iface_dry( - subscription: Iptrunk, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "lldp_interface" - ) +def provision_ip_trunk_lldp_iface_dry(subscription: Iptrunk, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "lldp_interface") return { "subscription": subscription, - "label_text": ( - "Provision of the LLDP interface [DRY]" - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the LLDP interface [DRY]" "Please refresh to get the results of the playbook"), } @step("Provision IP trunk LLDP interface [FOR REAL]") -def provision_ip_trunk_lldp_iface_real( - subscription: Iptrunk, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "lldp_interface", False - ) +def provision_ip_trunk_lldp_iface_real(subscription: Iptrunk, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "lldp_interface", False) return { "subscription": subscription, - "label_text": ( - "Provision of the LLDP interface [REAL]" - "Please refresh to get the results of the playbook" - ), + "label_text": ("Provision of the LLDP interface [REAL]" "Please refresh to get the results of the playbook"), } @workflow( "Modify IP trunk", - initial_input_form=wrap_modify_initial_input_form( - initial_input_form_generator - ), + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), target=Target.MODIFY, ) def modify_generic(): diff --git a/gso/workflows/iptrunk/modify_isis_metric.py b/gso/workflows/iptrunk/modify_isis_metric.py index e8911c76d2f96351a0dc7b912f09eabd5b65fb28..17beb8aa7335d5adb7aa4adcb18861335c5361ba 100644 --- a/gso/workflows/iptrunk/modify_isis_metric.py +++ b/gso/workflows/iptrunk/modify_isis_metric.py @@ -22,21 +22,15 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: @step("Update subscription") -def modify_iptrunk_subscription( - subscription: Iptrunk, isis_metric: int -) -> State: +def modify_iptrunk_subscription(subscription: Iptrunk, isis_metric: int) -> State: subscription.iptrunk.iptrunk_isis_metric = isis_metric return {"subscription": subscription} @step("Provision IP trunk ISIS interface [DRY RUN]") -def provision_ip_trunk_isis_iface_dry( - subscription: Iptrunk, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "isis_interface" - ) +def provision_ip_trunk_isis_iface_dry(subscription: Iptrunk, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "isis_interface") return { "subscription": subscription, @@ -51,12 +45,8 @@ def provision_ip_trunk_isis_iface_dry( @step("Provision IP trunk ISIS interface [FOR REAL]") -def provision_ip_trunk_isis_iface_real( - subscription: Iptrunk, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "isis_interface", False - ) +def provision_ip_trunk_isis_iface_real(subscription: Iptrunk, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "isis_interface", False) return { "subscription": subscription, @@ -72,9 +62,7 @@ def provision_ip_trunk_isis_iface_real( @workflow( "Modify IP trunk", - initial_input_form=wrap_modify_initial_input_form( - initial_input_form_generator - ), + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), target=Target.MODIFY, ) def modify_isis_metric(): diff --git a/gso/workflows/iptrunk/terminate_iptrunk.py b/gso/workflows/iptrunk/terminate_iptrunk.py index ba1b9aa03ac9f3e46a7a6c970e2b6758394cef94..471b9c3e55ebc72d35ea8f5b916ada4847753a16 100644 --- a/gso/workflows/iptrunk/terminate_iptrunk.py +++ b/gso/workflows/iptrunk/terminate_iptrunk.py @@ -18,9 +18,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> InputForm: subscription = Iptrunk.from_subscription(subscription_id) class TerminateForm(FormPage): - are_you_sure: Label = ( - f"Are you sure you want to remove {subscription.description}?" - ) + are_you_sure: Label = f"Are you sure you want to remove {subscription.description}?" return TerminateForm @@ -33,12 +31,8 @@ def modify_iptrunk_subscription(subscription: Iptrunk) -> State: @step("Drain traffic from trunk") -def drain_traffic_from_ip_trunk( - subscription: Iptrunk, process_id: UUIDstr -) -> State: - provisioning_proxy.provision_ip_trunk( - subscription, process_id, "isis_interface", False - ) +def drain_traffic_from_ip_trunk(subscription: Iptrunk, process_id: UUIDstr) -> State: + provisioning_proxy.provision_ip_trunk(subscription, process_id, "isis_interface", False) return { "subscription": subscription, "label_text": "This is setting the ISIS metric of the trunk to 9000" @@ -49,38 +43,29 @@ def drain_traffic_from_ip_trunk( @step("Deprovision IP trunk [DRY RUN]") -def deprovision_ip_trunk_dry( - subscription: Iptrunk, process_id: UUIDstr -) -> State: +def deprovision_ip_trunk_dry(subscription: Iptrunk, process_id: UUIDstr) -> State: provisioning_proxy.deprovision_ip_trunk(subscription, process_id, True) return { "subscription": subscription, - "label_text": "This is a dry run for the termination of an IP " - "trunk. " - "Press refresh to get the results", + "label_text": "This is a dry run for the termination of an IP " "trunk. " "Press refresh to get the results", } @step("Deprovision IP trunk [FOR REAL]") -def deprovision_ip_trunk_real( - subscription: Iptrunk, process_id: UUIDstr -) -> State: +def deprovision_ip_trunk_real(subscription: Iptrunk, process_id: UUIDstr) -> State: provisioning_proxy.deprovision_ip_trunk(subscription, process_id, False) return { "subscription": subscription, - "label_text": "This is a termination of an IP trunk. " - "Press refresh to get the results", + "label_text": "This is a termination of an IP trunk. " "Press refresh to get the results", } @step("Deprovision IPv4 networks") def deprovision_ip_trunk_ipv4(subscription: Iptrunk) -> None: service_network = ipam.delete_service_network( - network=ipaddress.ip_network( - subscription.iptrunk.iptrunk_ipv4_network - ), + network=ipaddress.ip_network(subscription.iptrunk.iptrunk_ipv4_network), service_type="TRUNK", ) return {"service_network": service_network} @@ -89,9 +74,7 @@ def deprovision_ip_trunk_ipv4(subscription: Iptrunk) -> None: @step("Deprovision IPv6 networks") def deprovision_ip_trunk_ipv6(subscription: Iptrunk) -> None: service_network = ipam.delete_service_network( - network=ipaddress.ip_network( - subscription.iptrunk.iptrunk_ipv6_network - ), + network=ipaddress.ip_network(subscription.iptrunk.iptrunk_ipv6_network), service_type="TRUNK", ) return {"service_network": service_network} @@ -99,9 +82,7 @@ def deprovision_ip_trunk_ipv6(subscription: Iptrunk) -> None: @workflow( "Terminate IPtrunk", - initial_input_form=wrap_modify_initial_input_form( - initial_input_form_generator - ), + initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), target=Target.TERMINATE, ) def terminate_iptrunk(): diff --git a/gso/workflows/site/create_site.py b/gso/workflows/site/create_site.py index 756c992a519e349b088abe2642efd2f616ae3b3f..0a893a70e61d2a381acbe46866d236c33db41e80 100644 --- a/gso/workflows/site/create_site.py +++ b/gso/workflows/site/create_site.py @@ -66,18 +66,14 @@ def initialize_subscription( subscription.description = f"Site {site_name}" - subscription = site.SiteProvisioning.from_other_lifecycle( - subscription, SubscriptionLifecycle.PROVISIONING - ) + subscription = site.SiteProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING) return {"subscription": subscription} @workflow( "Create Site", - initial_input_form=wrap_create_initial_input_form( - initial_input_form_generator - ), + initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), target=Target.CREATE, ) def create_site(): diff --git a/setup.py b/setup.py index 5dbfaf3364045bdbc9ea7c5b0ca84da3ebad47a2..e6e2fd8bd0a414877cbda505b5b2e2042e6556d2 100644 --- a/setup.py +++ b/setup.py @@ -1,16 +1,16 @@ from setuptools import find_packages, setup setup( - name='geant-service-orchestrator', + name="geant-service-orchestrator", version="0.1", - author='GEANT', - author_email='swd@geant.org', - description='GEANT Service Orchestrator', - url='https://gitlab.geant.org/goat/geant-service-orchestrator', + author="GEANT", + author_email="swd@geant.org", + description="GEANT Service Orchestrator", + url="https://gitlab.geant.org/goat/geant-service-orchestrator", packages=find_packages(), install_requires=[ - 'orchestrator-core==1.0.0', - 'pydantic', - 'requests', - ] + "orchestrator-core==1.0.0", + "pydantic", + "requests", + ], ) diff --git a/test/conftest.py b/test/conftest.py index 04952145b61bd374ca02dc44abb1c00c522f9ac8..5ac3eb9c18167a7395e27a28b3ded0e69070b60e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -7,16 +7,13 @@ import tempfile import pytest -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def configuration_data(): - with contextlib.closing( - socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(('', 0)) + with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) yield { - "GENERAL": { - "public_hostname": "https://gap.geant.org" - }, + "GENERAL": {"public_hostname": "https://gap.geant.org"}, "RESOURCE_MANAGER_API_PREFIX": "http://localhost:44444", "IPAM": { "INFOBLOX": { @@ -24,97 +21,56 @@ def configuration_data(): "wapi_version": "v2.12", "host": "10.0.0.1", "username": "robot-user", - "password": "robot-user-password" + "password": "robot-user-password", }, "LO": { - "V4": { - "containers": [], - "networks": ["10.255.255.0/26"], - "mask": 32 - }, - "V6": { - "containers": [], - "networks": ["dead:beef::/80"], - "mask": 128 - }, + "V4": {"containers": [], "networks": ["10.255.255.0/26"], "mask": 32}, + "V6": {"containers": [], "networks": ["dead:beef::/80"], "mask": 128}, "domain_name": ".lo", - "dns_view": "default" + "dns_view": "default", }, "TRUNK": { - "V4": { - "containers": ["10.255.255.0/24", "10.255.254.0/24"], - "networks": [], - "mask": 31 - }, - "V6": { - "containers": ["dead:beef::/64", "dead:beee::/64"], - "networks": [], - "mask": 126 - }, + "V4": {"containers": ["10.255.255.0/24", "10.255.254.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64", "dead:beee::/64"], "networks": [], "mask": 126}, "domain_name": ".trunk", - "dns_view": "default" + "dns_view": "default", }, "GEANT_IP": { - "V4": { - "containers": ["10.255.255.0/24", "10.255.254.0/24"], - "networks": [], - "mask": 31 - }, - "V6": { - "containers": ["dead:beef::/64", "dead:beee::/64"], - "networks": [], - "mask": 126 - }, + "V4": {"containers": ["10.255.255.0/24", "10.255.254.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef::/64", "dead:beee::/64"], "networks": [], "mask": 126}, "domain_name": ".geantip", - "dns_view": "default" + "dns_view": "default", }, "SI": { - "V4": { - "containers": ["10.255.253.128/25"], - "networks": [], - "mask": 31 - }, - "V6": { - "containers": [], - "networks": [], - "mask": 126 - }, + "V4": {"containers": ["10.255.253.128/25"], "networks": [], "mask": 31}, + "V6": {"containers": [], "networks": [], "mask": 126}, "domain_name": ".geantip", - "dns_view": "default" + "dns_view": "default", }, "LT_IAS": { - "V4": { - "containers": ["10.255.255.0/24"], - "networks": [], - "mask": 31 - }, - "V6": { - "containers": ["dead:beef:cc::/48"], - "networks": [], - "mask": 126 - }, + "V4": {"containers": ["10.255.255.0/24"], "networks": [], "mask": 31}, + "V6": {"containers": ["dead:beef:cc::/48"], "networks": [], "mask": 126}, "domain_name": ".geantip", - "dns_view": "default" - } + "dns_view": "default", + }, }, "PROVISIONING_PROXY": { "scheme": "https", "api_base": "localhost:44444", "auth": "Bearer <token>", - "api_version": 1123 - } + "api_version": 1123, + }, } -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def data_config_filename(configuration_data): - file_name = os.path.join( - tempfile.gettempdir(), os.urandom(24).hex()) - open(file_name, 'x').close() - with open(file_name, 'wb') as f: - f.write(json.dumps(configuration_data).encode('utf-8')) + file_name = os.path.join(tempfile.gettempdir(), os.urandom(24).hex()) + open(file_name, "x").close() + with open(file_name, "wb") as f: + f.write(json.dumps(configuration_data).encode("utf-8")) f.flush() - os.environ['OSS_PARAMS_FILENAME'] = f.name + os.environ["OSS_PARAMS_FILENAME"] = f.name yield f.name diff --git a/test/test_ipam.py b/test/test_ipam.py index ef1e7111a90913525a3393128a33992aacf7a8cc..cea110810dc23d4bf910ae631fe6f3b99d9d473a 100644 --- a/test/test_ipam.py +++ b/test/test_ipam.py @@ -11,31 +11,30 @@ from gso.services import ipam def test_new_service_networks(data_config_filename): responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/network.*'), + url=re.compile(r".*/wapi.*/network.*"), json={ - '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501 - 'network': '10.255.255.20/32' - } + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "network": "10.255.255.20/32", + }, ) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/ipv6network.*'), + url=re.compile(r".*/wapi.*/ipv6network.*"), json={ - '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501 - 'network': 'dead:beef::18/128' - } + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + }, ) - service_networks = ipam.new_service_networks(service_type='TRUNK') + service_networks = ipam.new_service_networks(service_type="TRUNK") assert service_networks == ipam.ServiceNetworks( - v4=ipaddress.ip_network('10.255.255.20/32'), - v6=ipaddress.ip_network('dead:beef::18/128') + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") ) # should fail because this service type has networks instead of containers with pytest.raises(AssertionError): - service_networks = ipam.new_service_networks(service_type='LO') + service_networks = ipam.new_service_networks(service_type="LO") assert service_networks is None @@ -43,367 +42,338 @@ def test_new_service_networks(data_config_filename): def test_new_service_host(data_config_filename): responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/record:host$'), - json='record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20' # noqa: E501 + url=re.compile(r".*/wapi.*/record:host$"), + json="record:host/ZG5zLmhvc3QkLm5vbl9ETlNfaG9zdF9yb290LjAuMTY4MzcwNTU4MzY3MC5nc28udGVzdA:test.lo/%20", # noqa: E501 ) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/record:a$'), - json='record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501 + url=re.compile(r".*/wapi.*/record:a$"), + json="record:a/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default", # noqa: E501 ) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/record:aaaa$'), - json='record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default' # noqa: E501 + url=re.compile(r".*/wapi.*/record:aaaa$"), + json="record:aaaa/ZG5zLmJpbmRfYSQuX2RlZmF1bHQuZ3NvLHRlc3QsMTAuMjU1LjI1NS44:test.lo/default", # noqa: E501 ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/network.*10.255.255.*'), + url=re.compile(r".*/wapi.*/network.*10.255.255.*"), json=[ { "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 "network": "10.255.255.20/32", - "network_view": "default" + "network_view": "default", } - ] + ], ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/network.*10.255.254.*'), + url=re.compile(r".*/wapi.*/network.*10.255.254.*"), json=[ { - "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501 + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:10.255.254.20/32/default", # noqa: E501 "network": "10.255.254.20/32", - "network_view": "default" + "network_view": "default", } - ] + ], ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/ipv6network.*dead.*beef.*'), + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), json=[ { "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 "network": "dead:beef::18/128", - "network_view": "default" + "network_view": "default", } - ] + ], ) - responses.add( - method=responses.GET, - url=re.compile(r'.*/wapi.*/ipv6network.*beef.*dead.*'), - json=[] - ) + responses.add(method=responses.GET, url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), json=[]) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$'), # noqa: E501 - json={'ips': ['10.255.255.20']} + url=re.compile(r".*/wapi.*/network/.*10.255.255.*?_function=next_available_ip&num=1$"), # noqa: E501 + json={"ips": ["10.255.255.20"]}, ) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$'), # noqa: E501 + url=re.compile(r".*/wapi.*/network/.*10.255.254.*?_function=next_available_ip&num=1$"), # noqa: E501 body="Cannot find 1 available IP address(es) in this network", - status=400 + status=400, ) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$'), # noqa: E501 - json={'ips': ['dead:beef::18']} + url=re.compile(r".*/wapi.*/ipv6network/.*?_function=next_available_ip&num=1$"), # noqa: E501 + json={"ips": ["dead:beef::18"]}, ) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/network.*_return_fields.*'), + url=re.compile(r".*/wapi.*/network.*_return_fields.*"), json={ - '_ref': 'network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default', # noqa: E501 - 'network': '10.255.255.20/32' - } + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.20/32/default", # noqa: E501 + "network": "10.255.255.20/32", + }, ) responses.add( method=responses.POST, - url=re.compile(r'.*/wapi.*/ipv6network.*_return_fields.*'), + url=re.compile(r".*/wapi.*/ipv6network.*_return_fields.*"), json={ - '_ref': 'ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default', # noqa: E501 - 'network': 'dead:beef::18/128' - } + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "network": "dead:beef::18/128", + }, ) # test host creation by IP addresses service_hosts = ipam.new_service_host( - hostname='test', - service_type='TRUNK', + hostname="test", + service_type="TRUNK", host_addresses=ipam.HostAddresses( - v4=ipaddress.ip_address('10.255.255.20'), - v6=ipaddress.ip_address('dead:beef::18') - ) + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") + ), ) assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address('10.255.255.20'), - v6=ipaddress.ip_address('dead:beef::18') + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") ) # test host creation by network addresses service_hosts = ipam.new_service_host( - hostname='test', - service_type='TRUNK', + hostname="test", + service_type="TRUNK", service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network('10.255.255.20/32'), - v6=ipaddress.ip_network('dead:beef::18/128') - ) + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") + ), ) assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address('10.255.255.20'), - v6=ipaddress.ip_address('dead:beef::18') + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") ) # test host creation by just service_type when service cfg uses networks - service_hosts = ipam.new_service_host( - hostname='test', - service_type='LO' - ) + service_hosts = ipam.new_service_host(hostname="test", service_type="LO") assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address('10.255.255.20'), - v6=ipaddress.ip_address('dead:beef::18') + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") ) # test host creation by just service_type when service cfg uses containers - service_hosts = ipam.new_service_host( - hostname='test', - service_type='TRUNK' - ) + service_hosts = ipam.new_service_host(hostname="test", service_type="TRUNK") assert service_hosts == ipam.HostAddresses( - v4=ipaddress.ip_address('10.255.255.20'), - v6=ipaddress.ip_address('dead:beef::18') + v4=ipaddress.ip_address("10.255.255.20"), v6=ipaddress.ip_address("dead:beef::18") ) # test host creation that should return a no available IP error with pytest.raises(AssertionError): service_hosts = ipam.new_service_host( - hostname='test', - service_type='TRUNK', + hostname="test", + service_type="TRUNK", service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network('10.255.254.20/32'), - v6=ipaddress.ip_network('dead:beef::18/128') - ) + v4=ipaddress.ip_network("10.255.254.20/32"), v6=ipaddress.ip_network("dead:beef::18/128") + ), ) assert service_hosts is None # test host creation that should return a network not exist error with pytest.raises(AssertionError): service_hosts = ipam.new_service_host( - hostname='test', - service_type='TRUNK', + hostname="test", + service_type="TRUNK", service_networks=ipam.ServiceNetworks( - v4=ipaddress.ip_network('10.255.255.20/32'), - v6=ipaddress.ip_network('beef:dead::18/128') - ) + v4=ipaddress.ip_network("10.255.255.20/32"), v6=ipaddress.ip_network("beef:dead::18/128") + ), ) assert service_hosts is None @responses.activate def test_delete_service_network(data_config_filename): - responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/network.*10.255.255.0.*'), + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), json=[ { - "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 + "_ref": "network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 "network": "10.255.255.0/26", - "network_view": "default" + "network_view": "default", } - ] + ], ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/network.*10.255.255.20.*'), + url=re.compile(r".*/wapi.*/network.*10.255.255.20.*"), json=[ { - "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 + "_ref": "network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 "network": "100.255.255.20/32", - "network_view": "default" + "network_view": "default", } - ] + ], ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/ipv6network.*dead.*beef.*'), + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), json=[ { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 "network": "dead:beef::18/128", - "network_view": "default" + "network_view": "default", } - ] + ], ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*/ipv6network.*beef.*dead.*'), + url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), json=[ { - "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 + "_ref": "ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 "network": "beef:dead::18/128", - "network_view": "default" + "network_view": "default", } - ] + ], ) responses.add( method=responses.DELETE, - url=re.compile(r'.*/wapi.*/network.*10.255.255.0.*'), - body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default" # noqa: E501 + url=re.compile(r".*/wapi.*/network.*10.255.255.0.*"), + body="network/ZG5zLm5ldHdvcmskMTAuMjU1LjI1NS4yMC8zMi8w:10.255.255.0/26/default", # noqa: E501 ) responses.add( method=responses.DELETE, - url=re.compile(r'.*/wapi.*/network.*100.255.255.*'), - body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default" # noqa: E501 + url=re.compile(r".*/wapi.*/network.*100.255.255.*"), + body="network/ZG5zLm5Gd0VHQkRQUjMzLjMwNzIuMzE1LzAyLzI:100.255.255.20/32/default", # noqa: E501 ) responses.add( method=responses.DELETE, - url=re.compile(r'.*/wapi.*/ipv6network.*dead.*beef.*'), - body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default" # noqa: E501 + url=re.compile(r".*/wapi.*/ipv6network.*dead.*beef.*"), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:dead%3Abeef%3A%3A18/128/default", # noqa: E501 ) responses.add( method=responses.DELETE, - url=re.compile(r'.*/wapi.*/ipv6network.*beef.*dead.*'), - body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default" # noqa: E501 + url=re.compile(r".*/wapi.*/ipv6network.*beef.*dead.*"), + body="ipv6network/ZG5zLm5ldHdvcmskZGVhZDpiZWVmOjoxOC8xMjgvMA:beef%3Adead%3A%3A18/128/default", # noqa: E501 ) - service_network = ipam.delete_service_network( - network=ipaddress.ip_network('10.255.255.0/26'), service_type='LO' - ) - assert service_network == ipam.V4ServiceNetwork( - v4=ipaddress.ip_network('10.255.255.0/26') - ) + service_network = ipam.delete_service_network(network=ipaddress.ip_network("10.255.255.0/26"), service_type="LO") + assert service_network == ipam.V4ServiceNetwork(v4=ipaddress.ip_network("10.255.255.0/26")) with pytest.raises(AssertionError): service_network = ipam.delete_service_network( - network=ipaddress.ip_network('10.255.255.20/32'), - service_type='LO' + network=ipaddress.ip_network("10.255.255.20/32"), service_type="LO" ) assert service_network is None service_network = ipam.delete_service_network( - network=ipaddress.ip_network('dead:beef::18/128'), - service_type='TRUNK' - ) - assert service_network == ipam.V6ServiceNetwork( - v6=ipaddress.ip_network('dead:beef::18/128') + network=ipaddress.ip_network("dead:beef::18/128"), service_type="TRUNK" ) + assert service_network == ipam.V6ServiceNetwork(v6=ipaddress.ip_network("dead:beef::18/128")) with pytest.raises(AssertionError): service_network = ipam.delete_service_network( - network=ipaddress.ip_network('beef:dead::18/128'), - service_type='TRUNK' + network=ipaddress.ip_network("beef:dead::18/128"), service_type="TRUNK" ) assert service_network is None @responses.activate def test_delete_service_host(data_config_filename): - responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*record:host.*'), + url=re.compile(r".*/wapi.*record:host.*"), json=[ { - '_ref': 'record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default', # noqa: E501 - 'ipv4addrs': [ + "_ref": "record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYV9sbw:ha_lo.gso/default", # noqa: E501 + "ipv4addrs": [ { - '_ref': 'record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default', # noqa: E501 - 'configure_for_dhcp': False, - 'host': 'ha_lo.gso', 'ipv4addr': '10.255.255.1' + "_ref": "record:host_ipv4addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZ3NvLmhhX2xvLjEwLjI1NS4yNTUuMS40.255.255.1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv4addr": "10.255.255.1", } ], - 'ipv6addrs': [ + "ipv6addrs": [ { - '_ref': 'record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default', # noqa: E501 - 'configure_for_dhcp': False, - 'host': 'ha_lo.gso', 'ipv6addr': 'dead:beef::1' + "_ref": "record:host_ipv6addr/ZG5zLmhvc3RfYWRkcmVzcyQuX2RlZmF1bHQuZvLmhhX2xvLmRlYWQ6YmVlZjo6MS4:dead%3Abeef%3A%3A1/ha_lo.gso/default", # noqa: E501 + "configure_for_dhcp": False, + "host": "ha_lo.gso", + "ipv6addr": "dead:beef::1", } ], - 'name': 'ha_lo.gso', 'view': 'default' + "name": "ha_lo.gso", + "view": "default", } - ] + ], ) responses.add( method=responses.GET, - url=re.compile(r'.*/wapi.*record:cname.*'), + url=re.compile(r".*/wapi.*record:cname.*"), json=[ { - '_ref': 'record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default', # noqa: E501 - 'canonical': 'hA_LO.lo', 'name': 'alias1.ha.lo', - 'view': 'default' + "_ref": "record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczE:alias1.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias1.ha.lo", + "view": "default", }, { - '_ref': 'record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default', # noqa: E501 - 'canonical': 'hA_LO.lo', 'name': 'alias2.ha.lo', - 'view': 'default' - } - ] + "_ref": "record:cname/5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYS5hbGlhczI:alias2.ha.gso/default", # noqa: E501 + "canonical": "hA_LO.lo", + "name": "alias2.ha.lo", + "view": "default", + }, + ], ) responses.add( method=responses.DELETE, - url=re.compile(r'.*/wapi.*record:host.*'), - body='record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default' # noqa: E501 + url=re.compile(r".*/wapi.*record:host.*"), + body="record:host/ZG5zLmhvc3QkLl9kZWZhdWx0Lmdzby5oYl9sbw:hb_lo.gso/default", # noqa: E501 ) responses.add( method=responses.DELETE, - url=re.compile(r'.*/wapi.*record:cname.*'), - body='record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default' # noqa: E501 + url=re.compile(r".*/wapi.*record:cname.*"), + body="record:cname/ZG5zLmJpbmRfY25hbWUkLl9kZWZhdWx0Lmdzby5oYi5hbGlhczE:alias1.hb.gso/default", # noqa: E501 ) input_host_addresses = ipam.HostAddresses( - v4=ipaddress.ip_address('10.255.255.1'), - v6=ipaddress.ip_address('dead:beef::1') + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") ) host_addresses = ipam.delete_service_host( - hostname='ha_lo', + hostname="ha_lo", host_addresses=input_host_addresses, - cname_aliases=['alias1.ha', 'alias2.ha'], - service_type='LO' + cname_aliases=["alias1.ha", "alias2.ha"], + service_type="LO", ) assert host_addresses == ipam.HostAddresses( - v4=ipaddress.ip_address('10.255.255.1'), - v6=ipaddress.ip_address('dead:beef::1') + v4=ipaddress.ip_address("10.255.255.1"), v6=ipaddress.ip_address("dead:beef::1") ) # Fail because missing CNAME with pytest.raises(AssertionError): host_addresses = ipam.delete_service_host( - hostname='ha_lo', - host_addresses=input_host_addresses, - cname_aliases=['alias1.ha'], - service_type='LO' + hostname="ha_lo", host_addresses=input_host_addresses, cname_aliases=["alias1.ha"], service_type="LO" ) assert host_addresses is None # Fail because non-matching CNAME with pytest.raises(AssertionError): host_addresses = ipam.delete_service_host( - hostname='ha_lo', + hostname="ha_lo", host_addresses=input_host_addresses, - cname_aliases=['alias1.ha', 'alias2.ha', 'alias3.ha'], - service_type='LO' + cname_aliases=["alias1.ha", "alias2.ha", "alias3.ha"], + service_type="LO", ) assert host_addresses is None