Newer
Older
from typing import Any
from uuid import UUID
from orchestrator.db import (
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
)
from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductType
SubscriptionType = dict[str, Any]
def get_active_subscriptions(
product_type: str,
includes: list[str] | None = None,
excludes: list[str] | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions for a specific product type.
:param product_type: The type of the product for which to retrieve subscriptions.
:type product_type: str
:param includes: List of fields to be included in the returned Subscription objects.
:type includes: list[str]
:param excludes: List of fields to be excluded from the returned Subscription objects.
:type excludes: list[str]
:return: A list of Subscription objects that match the query.
:rtype: list[Subscription]
if not includes:
includes = [col.name for col in SubscriptionTable.__table__.columns]
if excludes:
includes = [field for field in includes if field not in excludes]
dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]
query = SubscriptionTable.query.join(ProductTable).filter(
ProductTable.product_type == product_type,
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
results = query.with_entities(*dynamic_fields).all()
return [dict(zip(includes, result)) for result in results]
def get_active_site_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for sites.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for sites.
:rtype: list[Subscription]
return get_active_subscriptions(product_type=ProductType.SITE, includes=includes)
def get_active_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for routers.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for routers.
:rtype: list[Subscription]
return get_active_subscriptions(product_type=ProductType.ROUTER, includes=includes)
def get_product_id_by_name(product_name: ProductType) -> UUID:
"""Retrieve the :term:`UUID` of a product by its name.
:param product_name: The name of the product.
:type product_name: ProductType
:return UUID: The :term:`UUID` of the product.
:rtype: UUID
"""
return ProductTable.query.filter_by(name=product_name).first().product_id
def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]:
"""Retrieve a list of active subscriptions based on a specified field and its value.
:param field_name: The name of the field to filter by.
:type field_name: str
:param field_value: The value of the field to match against.
:type field_value: Any
:return: A list of active Subscription objects that match the criteria.
SubscriptionTable.query.join(ProductTable)
.join(SubscriptionInstanceTable)
.join(SubscriptionInstanceValueTable)
.join(ResourceTypeTable)
.filter(SubscriptionInstanceValueTable.value == field_value)
.filter(ResourceTypeTable.resource_type == field_name)
.filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_products processes.
Returns
-------
int
The count of incomplete 'validate_products' processes.
"""
return ProcessTable.query.filter(
ProcessTable.workflow_name == "validate_products", ProcessTable.last_status != "completed"
).count()
def get_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently in sync."""
return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()