diff --git a/gso/products/product_blocks/iptrunk.py b/gso/products/product_blocks/iptrunk.py index 8410fe352ebd84da58aa82e154130144637db1d4..bab3f4494fd0f40063eecbdc260652f62a6df9f2 100644 --- a/gso/products/product_blocks/iptrunk.py +++ b/gso/products/product_blocks/iptrunk.py @@ -1,7 +1,7 @@ """IP trunk product block that has all parameters of a subscription throughout its lifecycle.""" import ipaddress -from typing import Optional, TypeVar +from typing import TypeVar from orchestrator.domain.base import ProductBlockModel from orchestrator.forms.validators import UniqueConstrainedList @@ -28,24 +28,24 @@ class IptrunkSideBlockInactive( ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="IptrunkSideBlock" ): iptrunk_side_node: RouterBlockInactive - iptrunk_side_ae_iface: Optional[str] = None - iptrunk_side_ae_geant_a_sid: Optional[str] = None + iptrunk_side_ae_iface: str | None = None + iptrunk_side_ae_geant_a_sid: str | None = None iptrunk_side_ae_members: list[str] = Field(default_factory=list) iptrunk_side_ae_members_description: list[str] = Field(default_factory=list) class IptrunkSideBlockProvisioning(IptrunkSideBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): iptrunk_side_node: RouterBlockProvisioning - iptrunk_side_ae_iface: Optional[str] = None - iptrunk_side_ae_geant_a_sid: Optional[str] = None + iptrunk_side_ae_iface: str | None = None + iptrunk_side_ae_geant_a_sid: str | None = None iptrunk_side_ae_members: list[str] = Field(default_factory=list) iptrunk_side_ae_members_description: list[str] = Field(default_factory=list) class IptrunkSideBlock(IptrunkSideBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): iptrunk_side_node: RouterBlock - iptrunk_side_ae_iface: Optional[str] = None - iptrunk_side_ae_geant_a_sid: Optional[str] = None + iptrunk_side_ae_iface: str | None = None + iptrunk_side_ae_geant_a_sid: str | None = None iptrunk_side_ae_members: list[str] = Field(default_factory=list) iptrunk_side_ae_members_description: list[str] = Field(default_factory=list) @@ -55,14 +55,14 @@ class IptrunkBlockInactive( ): """A trunk that's currently inactive, see :class:`IptrunkBlock`.""" - geant_s_sid: Optional[str] = None - iptrunk_description: Optional[str] = None - iptrunk_type: Optional[IptrunkType] = None - iptrunk_speed: Optional[str] = None - iptrunk_minimum_links: Optional[int] = None - iptrunk_isis_metric: Optional[int] = None - iptrunk_ipv4_network: Optional[ipaddress.IPv4Network] = None - iptrunk_ipv6_network: Optional[ipaddress.IPv6Network] = None + geant_s_sid: str | None = None + iptrunk_description: str | None = None + iptrunk_type: IptrunkType | None = None + iptrunk_speed: str | None = None + iptrunk_minimum_links: int | None = None + iptrunk_isis_metric: int | None = None + iptrunk_ipv4_network: ipaddress.IPv4Network | None = None + iptrunk_ipv6_network: ipaddress.IPv6Network | None = None # iptrunk_sides: IptrunkSides[IptrunkSideBlockInactive] @@ -70,14 +70,14 @@ class IptrunkBlockInactive( class IptrunkBlockProvisioning(IptrunkBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): """A trunk that's currently being provisioned, see :class:`IptrunkBlock`.""" - geant_s_sid: Optional[str] = None - iptrunk_description: Optional[str] = None - iptrunk_type: Optional[IptrunkType] = None - iptrunk_speed: Optional[str] = None - iptrunk_minimum_links: Optional[int] = None - iptrunk_isis_metric: Optional[int] = None - iptrunk_ipv4_network: Optional[ipaddress.IPv4Network] = None - iptrunk_ipv6_network: Optional[ipaddress.IPv6Network] = None + geant_s_sid: str | None = None + iptrunk_description: str | None = None + iptrunk_type: IptrunkType | None = None + iptrunk_speed: str | None = None + iptrunk_minimum_links: int | None = None + iptrunk_isis_metric: int | None = None + iptrunk_ipv4_network: ipaddress.IPv4Network | None = None + iptrunk_ipv6_network: ipaddress.IPv6Network | None = None # iptrunk_sides: IptrunkSides[IptrunkSideBlockProvisioning] diff --git a/gso/products/product_blocks/router.py b/gso/products/product_blocks/router.py index 3bb9b498e2f93c07a36ebfdbada38a86cf40d085..731da45f1e9a4ddb576f762a78cdf2b7aa7446c3 100644 --- a/gso/products/product_blocks/router.py +++ b/gso/products/product_blocks/router.py @@ -1,6 +1,5 @@ """Product block for :class:`Router` products.""" import ipaddress -from typing import Optional from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle, strEnum @@ -29,19 +28,19 @@ class RouterBlockInactive( ): """A router that's being currently inactive. See :class:`RouterBlock`.""" - router_fqdn: Optional[str] = None - router_ts_port: Optional[PortNumber] = None - router_access_via_ts: Optional[bool] = None - router_lo_ipv4_address: Optional[ipaddress.IPv4Address] = None - router_lo_ipv6_address: Optional[ipaddress.IPv6Address] = None - router_lo_iso_address: Optional[str] = None - router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None - router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None - router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None - router_vendor: Optional[RouterVendor] = None - router_role: Optional[RouterRole] = None - router_site: Optional[SiteBlockInactive] - router_is_ias_connected: Optional[bool] = None + router_fqdn: str | None = None + router_ts_port: PortNumber | None = None + router_access_via_ts: bool | None = None + router_lo_ipv4_address: ipaddress.IPv4Address | None = None + router_lo_ipv6_address: ipaddress.IPv6Address | None = None + router_lo_iso_address: str | None = None + router_si_ipv4_network: ipaddress.IPv4Network | None = None + router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None + router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None + router_vendor: RouterVendor | None = None + router_role: RouterRole | None = None + router_site: SiteBlockInactive | None + router_is_ias_connected: bool | None = None def generate_fqdn(hostname: str, site_name: str, country_code: str) -> str: @@ -53,17 +52,17 @@ class RouterBlockProvisioning(RouterBlockInactive, lifecycle=[SubscriptionLifecy router_fqdn: str router_ts_port: PortNumber - router_access_via_ts: Optional[bool] = None - router_lo_ipv4_address: Optional[ipaddress.IPv4Address] = None - router_lo_ipv6_address: Optional[ipaddress.IPv6Address] = None - router_lo_iso_address: Optional[str] = None - router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None - router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None - router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None - router_vendor: Optional[RouterVendor] = None - router_role: Optional[RouterRole] = None - router_site: Optional[SiteBlockProvisioning] - router_is_ias_connected: Optional[bool] = None + router_access_via_ts: bool | None = None + router_lo_ipv4_address: ipaddress.IPv4Address | None = None + router_lo_ipv6_address: ipaddress.IPv6Address | None = None + router_lo_iso_address: str | None = None + router_si_ipv4_network: ipaddress.IPv4Network | None = None + router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None + router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None + router_vendor: RouterVendor | None = None + router_role: RouterRole | None = None + router_site: SiteBlockProvisioning | None + router_is_ias_connected: bool | None = None class RouterBlock(RouterBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): @@ -81,11 +80,11 @@ class RouterBlock(RouterBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTI """The IPv6 loopback address of the router.""" router_lo_iso_address: str """The :term:`ISO` :term:`NET` of the router, used for :term:`IS-IS` support.""" - router_si_ipv4_network: Optional[ipaddress.IPv4Network] + router_si_ipv4_network: ipaddress.IPv4Network | None """The SI IPv4 network of the router.""" - router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] + router_ias_lt_ipv4_network: ipaddress.IPv4Network | None """The IAS LT IPv4 network of the router.""" - router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] + router_ias_lt_ipv6_network: ipaddress.IPv6Network | None """The IAS LT IPv6 network of the router.""" router_vendor: RouterVendor """The vendor of the router, can be any of the values defined in :class:`RouterVendor`.""" diff --git a/gso/products/product_blocks/site.py b/gso/products/product_blocks/site.py index 4a21313b22b5b6f9f23f5184f0c41df0d86b5174..25eb144e1a7539f5f224325b429dcb924a7f74ef 100644 --- a/gso/products/product_blocks/site.py +++ b/gso/products/product_blocks/site.py @@ -1,6 +1,4 @@ """The product block that describes a site subscription.""" -from typing import Optional - from orchestrator.domain.base import ProductBlockModel from orchestrator.types import SubscriptionLifecycle, strEnum @@ -23,31 +21,31 @@ class SiteTier(strEnum): class SiteBlockInactive(ProductBlockModel, lifecycle=[SubscriptionLifecycle.INITIAL], product_block_name="SiteBlock"): """A site that's currently inactive, see :class:`SiteBlock`.""" - site_name: Optional[str] = None - site_city: Optional[str] = None - site_country: Optional[str] = None - site_country_code: Optional[str] = None - site_latitude: Optional[LatitudeCoordinate] = None - site_longitude: Optional[LongitudeCoordinate] = None - site_internal_id: Optional[int] = None - site_bgp_community_id: Optional[int] = None - site_tier: Optional[SiteTier] = None - site_ts_address: Optional[str] = None + site_name: str | None = None + site_city: str | None = None + site_country: str | None = None + site_country_code: str | None = None + site_latitude: LatitudeCoordinate | None = None + site_longitude: LongitudeCoordinate | None = None + site_internal_id: int | None = None + site_bgp_community_id: int | None = None + site_tier: SiteTier | None = None + site_ts_address: str | None = None class SiteBlockProvisioning(SiteBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): """A site that's currently being provisioned, see :class:`SiteBlock`.""" - site_name: Optional[str] = None - site_city: Optional[str] = None - site_country: Optional[str] = None - site_country_code: Optional[str] = None - site_latitude: Optional[LatitudeCoordinate] = None - site_longitude: Optional[LongitudeCoordinate] = None - site_internal_id: Optional[int] = None - site_bgp_community_id: Optional[int] = None - site_tier: Optional[SiteTier] = None - site_ts_address: Optional[str] = None + site_name: str | None = None + site_city: str | None = None + site_country: str | None = None + site_country_code: str | None = None + site_latitude: LatitudeCoordinate | None = None + site_longitude: LongitudeCoordinate | None = None + site_internal_id: int | None = None + site_bgp_community_id: int | None = None + site_tier: SiteTier | None = None + site_ts_address: str | None = None class SiteBlock(SiteBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): @@ -73,7 +71,7 @@ class SiteBlock(SiteBlockProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]) """The :term:`BGP` community ID of a site, used to advertise routes learned at this site.""" site_tier: SiteTier """The tier of a site, as described in :class:`SiteTier`.""" - site_ts_address: Optional[str] = None + site_ts_address: str | None = None """The address of the terminal server that this router is connected to. The terminal server provides out of band access. This is required in case a link goes down, or when a router is initially added to the network and it does not have any IP trunks connected to it yet.""" diff --git a/gso/services/crm.py b/gso/services/crm.py index 7568e3116eb2e685d45d1a6c67ff11d9b0bf8533..8c2af4698aa2c16478f372388df88ae6d125c308 100644 --- a/gso/services/crm.py +++ b/gso/services/crm.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any class CustomerNotFoundError(Exception): @@ -16,7 +16,7 @@ def all_customers() -> list[dict]: ] -def get_customer_by_name(name: str) -> Dict[str, Any]: +def get_customer_by_name(name: str) -> dict[str, Any]: for customer in all_customers(): if customer["name"] == name: return customer diff --git a/gso/services/resource_manager.py b/gso/services/resource_manager.py deleted file mode 100644 index 88ef5e189e29382fca6b92a258d5f62de0cf6d72..0000000000000000000000000000000000000000 --- a/gso/services/resource_manager.py +++ /dev/null @@ -1,126 +0,0 @@ -# mypy: ignore-errors -from enum import Enum, auto -from typing import List - -from gso import settings - -# TODO -# - fill in the implementations -# - consider the extra API methods -# - decided what to do with various error conditions (currently assertions) - - -class InterfaceAllocationState(Enum): - AVAILABLE = auto() - RESERVED = auto() - ALLOCATED = auto() - - -def _dummy_router_interfaces(): - return { - "lags": [], - "physical": [{"name": f"ifc-{x}", "state": InterfaceAllocationState.AVAILABLE} for x in range(250)], - } - - -_DUMMY_INVENTORY = { - "fqdn-a": _dummy_router_interfaces(), - "fqdn-b": _dummy_router_interfaces(), - "fqdn-c": _dummy_router_interfaces(), - "fqdn-d": _dummy_router_interfaces(), -} - - -def import_new_router(new_router_fqdn: str, subscription_id: str, oss_params=settings.OSSParams): - # TODO: this is a dummy implementation - - # TODO: specifiy if this should be an error (and if now, what it means) - assert new_router_fqdn not in _DUMMY_INVENTORY - _DUMMY_INVENTORY[new_router_fqdn] = _dummy_router_interfaces() - - -def next_lag(router_fqdn: str, subscription_id: str, oss_params=settings.OSSParams) -> str: - # TODO: this is a dummy implementation - - assert router_fqdn in _DUMMY_INVENTORY - - lag_idx = 0 - while True: - lag_name = f"ae-{lag_idx}" - if lag_name not in _DUMMY_INVENTORY[router_fqdn]["lags"]: - _DUMMY_INVENTORY[router_fqdn]["lags"].append(lag_name) - return lag_name - lag_idx += 1 - - -def available_physical_interfaces(router_fqdn: str, oss_params=settings.OSSParams) -> List[str]: - # TODO: this is a dummy implementation - - assert router_fqdn in _DUMMY_INVENTORY - - return [ - ifc["name"] - for ifc in _DUMMY_INVENTORY[router_fqdn]["physical"] - if ifc["state"] == InterfaceAllocationState.AVAILABLE - ] - - -def _find_physical(router_fqdn: str, interface_name: str) -> dict: - assert router_fqdn in _DUMMY_INVENTORY - for ifc in _DUMMY_INVENTORY[router_fqdn]["physical"]: - if ifc["name"] == interface_name: - return ifc - raise AssertionError(f"interface {interface_name} not found on {router_fqdn}") - - -def reserve_physical_interface( - router_fqdn: str, interface_name: str, subscription_id: str, oss_params=settings.OSSParams -): - # TODO: this is a dummy implementation - - ifc = _find_physical(router_fqdn, interface_name) - assert ( - ifc["state"] == InterfaceAllocationState.AVAILABLE - ), f"interface {router_fqdn}:{interface_name} is not available" - ifc["state"] = InterfaceAllocationState.RESERVED - - -def allocate_physical_interface(router_fqdn: str, interface_name: str, oss_params=settings.OSSParams): - # TODO: this is a dummy implementation - - ifc = _find_physical(router_fqdn, interface_name) - # TODO: is there a use case for moving - # directly from AVAILABLE to ALLOCATED? - assert ( - ifc["state"] == InterfaceAllocationState.RESERVED - ), f"interface {router_fqdn}:{interface_name} is not reserved" - ifc["state"] = InterfaceAllocationState.RESERVED - - -def free_physical_interface(router_fqdn: str, interface_name: str, oss_params=settings.OSSParams): - # TODO: this is a dummy implementation - - ifc = _find_physical(router_fqdn, interface_name) - # TODO: is this truly an error that should be handled? - # or is it ok to ignore this? - assert ( - ifc["state"] != InterfaceAllocationState.AVAILABLE - ), f"interface {router_fqdn}:{interface_name} is already available" - ifc["state"] = InterfaceAllocationState.AVAILABLE - - -def free_lag(router_fqdn: str, lag_name: str, oss_params=settings.OSSParams): - # TODO: is this a use case that should be handled? - # e.g. who keeps track of the bundled physical interfaces? - pass - - -def remove_router(router_fqdn: str, oss_params=settings.OSSParams): - # TODO: is this a use case that should be handled? - pass - - -def all_lags(router_fqdn: str, oss_params=settings.OSSParams) -> List[str]: - # TODO: is this a use case that should be handled? - assert router_fqdn in _DUMMY_INVENTORY - return _DUMMY_INVENTORY[router_fqdn]["lags"] diff --git a/gso/workflows/iptrunk/migrate_iptrunk.py b/gso/workflows/iptrunk/migrate_iptrunk.py index a92dc29c6e78d3043cd52bd1508b4355630812b4..3b2c1b4d160dcfa3a51815c6b1b898f7e38e4dfb 100644 --- a/gso/workflows/iptrunk/migrate_iptrunk.py +++ b/gso/workflows/iptrunk/migrate_iptrunk.py @@ -1,6 +1,6 @@ import re from logging import getLogger -from typing import NoReturn, Optional +from typing import NoReturn from orchestrator import step, workflow from orchestrator.config.assignee import Assignee @@ -47,7 +47,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: tt_number: str replace_side: ReplacedSide # type: ignore[valid-type] warning_label: Label = "Are we moving to a different Site?" # type: ignore[assignment] - migrate_to_different_site: Optional[bool] = False + migrate_to_different_site: bool | None = False old_side_input = yield OldSideIptrunkForm diff --git a/gso/workflows/tasks/import_router.py b/gso/workflows/tasks/import_router.py index 4d157709cb8f1b3b2e9a391e27af0f8278ad6c93..0821e31414c822197cb419101562a4d74a5afaec 100644 --- a/gso/workflows/tasks/import_router.py +++ b/gso/workflows/tasks/import_router.py @@ -1,5 +1,4 @@ import ipaddress -from typing import Optional from uuid import UUID from orchestrator import workflow @@ -61,9 +60,9 @@ def initial_input_form_generator() -> FormGenerator: router_lo_ipv4_address: ipaddress.IPv4Address router_lo_ipv6_address: ipaddress.IPv6Address router_lo_iso_address: str - router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None - router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None - router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None + router_si_ipv4_network: ipaddress.IPv4Network | None = None + router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None + router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None user_input = yield ImportRouter @@ -78,13 +77,13 @@ def initialize_subscription( router_vendor: router_pb.RouterVendor, router_site: str, router_role: router_pb.RouterRole, - is_ias_connected: Optional[bool] = None, - router_lo_ipv4_address: Optional[ipaddress.IPv4Address] = None, - router_lo_ipv6_address: Optional[ipaddress.IPv6Address] = None, - router_lo_iso_address: Optional[str] = None, - router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None, - router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None, - router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None, + is_ias_connected: bool | None = None, + router_lo_ipv4_address: ipaddress.IPv4Address | None = None, + router_lo_ipv6_address: ipaddress.IPv6Address | None = None, + router_lo_iso_address: str | None = None, + router_si_ipv4_network: ipaddress.IPv4Network | None = None, + router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None, + router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None, ) -> State: subscription.router.router_ts_port = ts_port subscription.router.router_vendor = router_vendor diff --git a/test/test_resource_manager.py b/test/test_resource_manager.py deleted file mode 100644 index 34cf53506c94d37bc36cfbae31d6998cf88a7a30..0000000000000000000000000000000000000000 --- a/test/test_resource_manager.py +++ /dev/null @@ -1,51 +0,0 @@ -import random -import string - -from gso.services import resource_manager - - -def _random_string(n=None, letters=string.ascii_letters + string.digits + string.punctuation): - # ignoring S311 because this is a test - if not n: - n = random.randint(1, 20) # noqa: S311 - return "".join(random.choices(letters, k=n)) # noqa: S311 - - -def test_new_router(): - router_name = _random_string(10) - assert router_name not in resource_manager._DUMMY_INVENTORY - resource_manager.import_new_router(new_router_fqdn=router_name, subscription_id=_random_string(10)) - assert router_name in resource_manager._DUMMY_INVENTORY - - -def test_new_lag(): - router_name = list(resource_manager._DUMMY_INVENTORY.keys())[0] - new_lags = { - resource_manager.next_lag(router_fqdn=router_name, subscription_id=_random_string(10)) for _ in range(10) - } - assert len(new_lags) == 10 - assert new_lags <= set(resource_manager._DUMMY_INVENTORY[router_name]["lags"]) - - -def test_physical_allocation_lifecycle_happy(): - router_name = list(resource_manager._DUMMY_INVENTORY.keys())[0] - - def _interfaces(): - return resource_manager.available_physical_interfaces(router_fqdn=router_name) - - initial_available = _interfaces() - - interface_name = initial_available[0] - - resource_manager.reserve_physical_interface( - router_fqdn=router_name, subscription_id=_random_string(10), interface_name=interface_name - ) - - current_available = _interfaces() - assert interface_name not in current_available - - resource_manager.allocate_physical_interface(router_fqdn=router_name, interface_name=interface_name) - resource_manager.free_physical_interface(router_fqdn=router_name, interface_name=interface_name) - - current_available = _interfaces() - assert interface_name in current_available diff --git a/test/workflows/__init__.py b/test/workflows/__init__.py index 2b42a33928c6c9932b3512bd64b66d56267e9d28..320af9738138d715891bb6e1c575be41ea7df04f 100644 --- a/test/workflows/__init__.py +++ b/test/workflows/__init__.py @@ -2,7 +2,7 @@ import difflib import pprint from copy import deepcopy from itertools import chain, repeat -from typing import Callable, Dict, List, Optional, Tuple, Union, cast +from typing import Callable, Tuple, cast from uuid import uuid4 import structlog @@ -64,7 +64,7 @@ def assert_state(result, expected): assert expected == actual, f"Invalid state. Expected superset of: {expected}, but was: {actual}" -def assert_state_equal(result: ProcessTable, expected: Dict, excluded_keys: Optional[List[str]] = None) -> None: +def assert_state_equal(result: ProcessTable, expected: dict, excluded_keys: list[str] | None = None) -> None: """Test state with certain keys excluded from both actual and expected state.""" if excluded_keys is None: excluded_keys = ["process_id", "workflow_target", "workflow_name"] @@ -143,7 +143,7 @@ class WorkflowInstanceForTests(LazyWorkflowInstance): return f"WorkflowInstanceForTests('{self.workflow}','{self.name}')" -def _store_step(step_log: List[Tuple[Step, WFProcess]]) -> Callable[[ProcessStat, Step, WFProcess], WFProcess]: +def _store_step(step_log: list[Tuple[Step, WFProcess]]) -> Callable[[ProcessStat, Step, WFProcess], WFProcess]: def __store_step(pstat: ProcessStat, step: Step, state: WFProcess) -> WFProcess: try: state = state.map(lambda s: json_loads(json_dumps(s))) @@ -155,23 +155,23 @@ def _store_step(step_log: List[Tuple[Step, WFProcess]]) -> Callable[[ProcessStat return __store_step -def _sanitize_input(input_data: Union[State, List[State]]) -> List[State]: +def _sanitize_input(input_data: State | list[State]) -> list[State]: # To be backwards compatible convert single dict to list - if not isinstance(input_data, List): + if not isinstance(input_data, list): input_data = [input_data] # We need a copy here and we want to mimic the actual code that returns a serialized version of the state - return cast(List[State], json_loads(json_dumps(input_data))) + return cast(list[State], json_loads(json_dumps(input_data))) -def run_workflow(workflow_key: str, input_data: Union[State, List[State]]) -> Tuple[WFProcess, ProcessStat, List]: +def run_workflow(workflow_key: str, input_data: State | list[State]) -> Tuple[WFProcess, ProcessStat, list]: # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.start_process` # The main differences are: we use a different step log function and we don't run in # a sepperate thread user_data = _sanitize_input(input_data) user = "john.doe" - step_log: List[Tuple[Step, WFProcess]] = [] + step_log: list[Tuple[Step, WFProcess]] = [] process_id = uuid4() workflow = get_workflow(workflow_key) @@ -201,8 +201,8 @@ def run_workflow(workflow_key: str, input_data: Union[State, List[State]]) -> Tu def resume_workflow( - process: ProcessStat, step_log: List[Tuple[Step, WFProcess]], input_data: State -) -> Tuple[WFProcess, List]: + process: ProcessStat, step_log: list[Tuple[Step, WFProcess]], input_data: State +) -> Tuple[WFProcess, list]: # ATTENTION!! This code needs to be as similar as possible to `server.services.processes.resume_process` # The main differences are: we use a different step log function, and we don't run in a separate thread user_data = _sanitize_input(input_data) @@ -228,8 +228,8 @@ def resume_workflow( def run_form_generator( - form_generator: FormGenerator, extra_inputs: Optional[List[State]] = None -) -> Tuple[List[dict], State]: + form_generator: FormGenerator, extra_inputs: list[State] | None = None +) -> Tuple[list[dict], State]: """Run a form generator to get the resulting forms and result. Warning! This does not run the actual pydantic validation on purpose. However, you should @@ -239,13 +239,13 @@ def run_form_generator( Args: ---- form_generator (FormGenerator): The form generator that will be run. - extra_inputs (Optional[List[State]]): List of user input dicts for each page in the generator. + extra_inputs (list[State] | None): list of user input dicts for each page in the generator. If no input is given for a page, an empty dict is used. The default value from the form is used as the default value for a field. Returns: ------- - Tuple[List[dict], State]: A list of generated forms and the result state for the whole generator. + Tuple[list[dict], State]: A list of generated forms and the result state for the whole generator. Example: ------- @@ -276,7 +276,7 @@ def run_form_generator( {'field': 'baz', 'bar': 42} """ - forms: List[dict] = [] + forms: list[dict] = [] result: State = {"s": 3} if extra_inputs is None: extra_inputs = []