Newer
Older
"""A collection of methods that make interaction with coreDB more straight-forward.
This prevents someone from having to re-write database statements many times, that might turn out to be erroneous
or inconsistent when not careful.
"""
from typing import Any
from uuid import UUID
from orchestrator.db import (
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
)
Karel van Klink
committed
from orchestrator.services.subscriptions import query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle
Karel van Klink
committed
from pydantic_forms.types import UUIDstr
from gso.products import ProductType
SubscriptionType = dict[str, Any]
def get_active_subscriptions(
product_type: str,
includes: list[str] | None = None,
excludes: list[str] | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions for a specific product type.
:param product_type: The type of the product for which to retrieve subscriptions.
:type product_type: str
:param includes: List of fields to be included in the returned Subscription objects.
:type includes: list[str]
:param excludes: List of fields to be excluded from the returned Subscription objects.
:type excludes: list[str]
:return: A list of Subscription objects that match the query.
:rtype: list[Subscription]
if not includes:
includes = [col.name for col in SubscriptionTable.__table__.columns]
if excludes:
includes = [field for field in includes if field not in excludes]
dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]
query = SubscriptionTable.query.join(ProductTable).filter(
ProductTable.product_type == product_type,
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
results = query.with_entities(*dynamic_fields).all()
Karel van Klink
committed
return [dict(zip(includes, result, strict=True)) for result in results]
Karel van Klink
committed
def get_active_site_subscriptions(
includes: list[str] | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for sites.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for sites.
:rtype: list[Subscription]
return get_active_subscriptions(product_type=ProductType.SITE, includes=includes)
Karel van Klink
committed
def get_active_router_subscriptions(
includes: list[str] | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for routers.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for routers.
:rtype: list[Subscription]
Karel van Klink
committed
return get_active_subscriptions(product_type="Router", includes=includes)
Karel van Klink
committed
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def get_active_iptrunk_subscriptions(
includes: list[str] | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for IP trunks.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for IP trunks.
:rtype: list[Subscription]
"""
return get_active_subscriptions(product_type="Iptrunk", includes=includes)
def get_active_trunks_that_terminate_on_router(subscription_id: UUIDstr) -> list[SubscriptionTable]:
"""Get all IP trunk subscriptions that are active, and terminate on the given ``subscription_id`` of a Router.
Given a ``subscription_id`` of a Router subscription, this method gives a list of all active IP trunk subscriptions
that terminate on this Router.
:param subscription_id: Subscription ID of a Router
:type subscription_id: UUIDstr
:return: A list of IP trunk subscriptions
:rtype: list[SubscriptionTable]
"""
return query_in_use_by_subscriptions(UUID(subscription_id)).join(ProductTable).filter(
ProductTable.product_type == "Iptrunk",
Karel van Klink
committed
).all()
def get_product_id_by_name(product_name: ProductType) -> UUID:
"""Retrieve the :term:`UUID` of a product by its name.
:param product_name: The name of the product.
:type product_name: ProductType
:return UUID: The :term:`UUID` of the product.
:rtype: UUID
"""
return ProductTable.query.filter_by(name=product_name).first().product_id
def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]:
"""Retrieve a list of active subscriptions based on a specified field and its value.
:param field_name: The name of the field to filter by.
:type field_name: str
:param field_value: The value of the field to match.
:type field_value: Any
:return: A list of active Subscription objects that match the criteria.
SubscriptionTable.query.join(ProductTable)
.join(SubscriptionInstanceTable)
.join(SubscriptionInstanceValueTable)
.join(ResourceTypeTable)
.filter(SubscriptionInstanceValueTable.value == field_value)
.filter(ResourceTypeTable.resource_type == field_name)
.filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_products processes.
:return: The count of incomplete 'validate_products' processes.
:rtype: int
Karel van Klink
committed
ProcessTable.workflow_name == "validate_products",
ProcessTable.last_status != "completed",
).count()
def get_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently in sync."""
return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()