Skip to content
Snippets Groups Projects
subscriptions.py 8 KiB
Newer Older
"""A collection of methods that make interaction with coreDB more straight-forward.

This prevents someone from having to re-write database statements many times, that might turn out to be erroneous
or inconsistent when not careful.
"""

from uuid import UUID

from orchestrator.db import (
    ProcessTable,
    ProductTable,
    ResourceTypeTable,
    SubscriptionInstanceTable,
    SubscriptionInstanceValueTable,
    SubscriptionTable,
)
from orchestrator.services.subscriptions import query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductName, ProductType
from gso.products.product_types.site import Site
SubscriptionType = dict[str, Any]
def get_subscriptions(
    product_types: list[ProductType],
    lifecycles: list[SubscriptionLifecycle] | None = None,
    includes: list[str] | None = None,
    excludes: list[str] | None = None,
) -> list[SubscriptionType]:
    """Retrieve active subscriptions for a specific product type.

    :param list[ProductName] product_types: The types of the product for which to retrieve subscriptions.
    :param SubscriptionLifecycle lifecycles: The lifecycles that the products must be in.
    :param list[str] includes: List of fields to be included in the returned Subscription objects.
    :param list[str] excludes: List of fields to be excluded from the returned Subscription objects.

    :return: A list of Subscription objects that match the query.
    :rtype: list[Subscription]
    if not lifecycles:
        lifecycles = list(SubscriptionLifecycle)

    if not includes:
        includes = [col.name for col in SubscriptionTable.__table__.columns]

    if excludes:
        includes = [field for field in includes if field not in excludes]

    dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]

    query = SubscriptionTable.query.join(ProductTable).filter(
        ProductTable.product_type.in_([str(product_type) for product_type in product_types]),
        SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]),
    results = query.with_entities(*dynamic_fields).all()

    return [dict(zip(includes, result, strict=True)) for result in results]
def get_active_site_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
    """Retrieve active subscriptions specifically for sites.

    :param includes: The fields to be included in the returned Subscription objects.
    :type includes: list[str]
    :return: A list of Subscription objects for sites.
    :rtype: list[Subscription]
    return get_subscriptions(
        product_types=[ProductType.SITE], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
    )


def get_router_subscriptions(
    includes: list[str] | None = None, lifecycles: list[SubscriptionLifecycle] | None = None
) -> list[SubscriptionType]:
    """Retrieve subscriptions specifically for routers.

    :param includes: The fields to be included in the returned Subscription objects.
    :type includes: list[str]

    :return: A list of Subscription objects for routers.
    :rtype: list[Subscription]
    """
    return get_subscriptions(product_types=[ProductType.ROUTER], lifecycles=lifecycles, includes=includes)
def get_active_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
    """Retrieve active subscriptions specifically for routers.

    :param includes: The fields to be included in the returned Subscription objects.
    :type includes: list[str]
    :return: A list of Subscription objects for routers.
    :rtype: list[Subscription]
    return get_subscriptions(
        product_types=[ProductType.ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
    )
def get_provisioning_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
    """Retrieve provisioning subscriptions specifically for routers.

    :param list[str] includes: The fields to be included in the returned Subscription objects.
    :return list[Subscription]: A list of router Subscription objects.
    """
    return get_subscriptions(
        product_types=[ProductType.ROUTER], lifecycles=[SubscriptionLifecycle.PROVISIONING], includes=includes
    )


def get_active_iptrunk_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
    """Retrieve active subscriptions specifically for IP trunks.

    :param includes: The fields to be included in the returned Subscription objects.
    :type includes: list[str]

    :return: A list of Subscription objects for IP trunks.
    :rtype: list[Subscription]
    """
    return get_subscriptions(
        product_types=[ProductType.IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
    )
def get_trunks_that_terminate_on_router(
    subscription_id: UUIDstr, lifecycle_state: SubscriptionLifecycle
) -> list[SubscriptionTable]:
    """Get all IP trunk subscriptions that terminate on the given ``subscription_id`` of a Router.
    Given a ``subscription_id`` of a Router subscription, this method gives a list of all IP trunk subscriptions that
    terminate on this Router. The given lifecycle state dictates the state of trunk subscriptions that are counted as
    terminating on this router.
    :param UUIDstr subscription_id: Subscription ID of a Router
    :param SubscriptionLifecycle lifecycle_state: Required lifecycle state of the IP trunk

    :return: A list of IP trunk subscriptions
    :rtype: list[SubscriptionTable]
    """
Karel van Klink's avatar
Karel van Klink committed
    return (
        query_in_use_by_subscriptions(UUID(subscription_id))
        .join(ProductTable)
        .filter(
            ProductTable.product_type == ProductType.IP_TRUNK,
            SubscriptionTable.status == lifecycle_state,
Karel van Klink's avatar
Karel van Klink committed
        )
        .all()
    )
def get_product_id_by_name(product_name: ProductName) -> UUID:
    """Retrieve the :term:`UUID` of a product by its name.
    :param product_name: The name of the product.
    :return UUID: The :term:`UUID` of the product.
    :rtype: UUID
    """
    return ProductTable.query.filter_by(name=product_name).first().product_id


def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]:
    """Retrieve a list of active subscriptions based on a specified field and its value.
    :param field_name: The name of the field to filter by.
    :type field_name: str
    :param field_value: The value of the field to match.
    :type field_value: Any

    :return: A list of active Subscription objects that match the criteria.
    :rtype: List[SubscriptionTable]
        SubscriptionTable.query.join(ProductTable)
        .join(SubscriptionInstanceTable)
        .join(SubscriptionInstanceValueTable)
        .join(ResourceTypeTable)
        .filter(SubscriptionInstanceValueTable.value == field_value)
        .filter(ResourceTypeTable.resource_type == field_name)
        .filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)


def count_incomplete_validate_products() -> int:
    """Count the number of incomplete validate_products processes.

    :return: The count of incomplete 'validate_products' processes.
    :rtype: int
    """
    return ProcessTable.query.filter(
        ProcessTable.workflow_name == "validate_products",
        ProcessTable.last_status != "completed",
    ).count()


def get_insync_subscriptions() -> list[SubscriptionTable]:
    """Retrieve all subscriptions that are currently in sync."""
    return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()


def get_site_by_name(site_name: str) -> Site:
    """Get a site by its name.

    :param site_name: The name of the site.
    :type site_name: str
    """
    subscription = get_active_subscriptions_by_field_and_value("site_name", site_name)
    if not subscription:
        msg = f"Site with name {site_name} not found."
        raise ValueError(msg)

    return Site.from_subscription(subscription[0].subscription_id)