"""A collection of methods that make interaction with coreDB more straight-forward. This prevents someone from having to re-write database statements many times, that might turn out to be erroneous or inconsistent when not careful. """ from typing import Any from uuid import UUID from orchestrator.db import ( ProcessTable, ProductTable, ResourceTypeTable, SubscriptionInstanceTable, SubscriptionInstanceValueTable, SubscriptionTable, ) from orchestrator.services.subscriptions import query_in_use_by_subscriptions from orchestrator.types import SubscriptionLifecycle from pydantic_forms.types import UUIDstr from gso.products import ProductName, ProductType from gso.products.product_types.site import Site SubscriptionType = dict[str, Any] def get_subscriptions( product_types: list[ProductType], lifecycles: list[SubscriptionLifecycle] | None = None, includes: list[str] | None = None, excludes: list[str] | None = None, ) -> list[SubscriptionType]: """Retrieve active subscriptions for a specific product type. :param list[ProductName] product_types: The types of the product for which to retrieve subscriptions. :param SubscriptionLifecycle lifecycles: The lifecycles that the products must be in. :param list[str] includes: List of fields to be included in the returned Subscription objects. :param list[str] excludes: List of fields to be excluded from the returned Subscription objects. :return: A list of Subscription objects that match the query. :rtype: list[Subscription] """ if not lifecycles: lifecycles = list(SubscriptionLifecycle) if not includes: includes = [col.name for col in SubscriptionTable.__table__.columns] if excludes: includes = [field for field in includes if field not in excludes] dynamic_fields = [getattr(SubscriptionTable, field) for field in includes] query = SubscriptionTable.query.join(ProductTable).filter( ProductTable.product_type.in_([str(product_type) for product_type in product_types]), SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]), ) results = query.with_entities(*dynamic_fields).all() return [dict(zip(includes, result, strict=True)) for result in results] def get_active_site_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]: """Retrieve active subscriptions specifically for sites. :param includes: The fields to be included in the returned Subscription objects. :type includes: list[str] :return: A list of Subscription objects for sites. :rtype: list[Subscription] """ return get_subscriptions( product_types=[ProductType.SITE], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes ) def get_router_subscriptions( includes: list[str] | None = None, lifecycles: list[SubscriptionLifecycle] | None = None ) -> list[SubscriptionType]: """Retrieve subscriptions specifically for routers. :param includes: The fields to be included in the returned Subscription objects. :type includes: list[str] :return: A list of Subscription objects for routers. :rtype: list[Subscription] """ return get_subscriptions(product_types=[ProductType.ROUTER], lifecycles=lifecycles, includes=includes) def get_active_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]: """Retrieve active subscriptions specifically for routers. :param includes: The fields to be included in the returned Subscription objects. :type includes: list[str] :return: A list of Subscription objects for routers. :rtype: list[Subscription] """ return get_subscriptions( product_types=[ProductType.ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes ) def get_provisioning_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]: """Retrieve provisioning subscriptions specifically for routers. :param list[str] includes: The fields to be included in the returned Subscription objects. :return list[Subscription]: A list of router Subscription objects. """ return get_subscriptions( product_types=[ProductType.ROUTER], lifecycles=[SubscriptionLifecycle.PROVISIONING], includes=includes ) def get_active_iptrunk_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]: """Retrieve active subscriptions specifically for IP trunks. :param includes: The fields to be included in the returned Subscription objects. :type includes: list[str] :return: A list of Subscription objects for IP trunks. :rtype: list[Subscription] """ return get_subscriptions( product_types=[ProductType.IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes ) def get_trunks_that_terminate_on_router( subscription_id: UUIDstr, lifecycle_state: SubscriptionLifecycle ) -> list[SubscriptionTable]: """Get all IP trunk subscriptions that terminate on the given ``subscription_id`` of a Router. Given a ``subscription_id`` of a Router subscription, this method gives a list of all IP trunk subscriptions that terminate on this Router. The given lifecycle state dictates the state of trunk subscriptions that are counted as terminating on this router. :param UUIDstr subscription_id: Subscription ID of a Router :param SubscriptionLifecycle lifecycle_state: Required lifecycle state of the IP trunk :return: A list of IP trunk subscriptions :rtype: list[SubscriptionTable] """ return ( query_in_use_by_subscriptions(UUID(subscription_id)) .join(ProductTable) .filter( ProductTable.product_type == ProductType.IP_TRUNK, SubscriptionTable.status == lifecycle_state, ) .all() ) def get_product_id_by_name(product_name: ProductName) -> UUID: """Retrieve the :term:`UUID` of a product by its name. :param product_name: The name of the product. :type product_name: ProductName :return UUID: The :term:`UUID` of the product. :rtype: UUID """ return ProductTable.query.filter_by(name=product_name).first().product_id def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]: """Retrieve a list of active subscriptions based on a specified field and its value. :param field_name: The name of the field to filter by. :type field_name: str :param field_value: The value of the field to match. :type field_value: Any :return: A list of active Subscription objects that match the criteria. :rtype: List[SubscriptionTable] """ return ( SubscriptionTable.query.join(ProductTable) .join(SubscriptionInstanceTable) .join(SubscriptionInstanceValueTable) .join(ResourceTypeTable) .filter(SubscriptionInstanceValueTable.value == field_value) .filter(ResourceTypeTable.resource_type == field_name) .filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE) .all() ) def count_incomplete_validate_products() -> int: """Count the number of incomplete validate_products processes. :return: The count of incomplete 'validate_products' processes. :rtype: int """ return ProcessTable.query.filter( ProcessTable.workflow_name == "validate_products", ProcessTable.last_status != "completed", ).count() def get_insync_subscriptions() -> list[SubscriptionTable]: """Retrieve all subscriptions that are currently in sync.""" return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all() def get_site_by_name(site_name: str) -> Site: """Get a site by its name. :param site_name: The name of the site. :type site_name: str """ subscription = get_active_subscriptions_by_field_and_value("site_name", site_name) if not subscription: msg = f"Site with name {site_name} not found." raise ValueError(msg) return Site.from_subscription(subscription[0].subscription_id)