Newer
Older
"""A collection of methods that make interaction with coreDB more straight-forward.
This prevents someone from having to re-write database statements many times, that might turn out to be erroneous
or inconsistent when not careful.
"""
from typing import Any
from uuid import UUID
from orchestrator.db import (
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
)
Karel van Klink
committed
from orchestrator.services.subscriptions import query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle
Karel van Klink
committed
from pydantic_forms.types import UUIDstr

Mohammad Torkashvand
committed
from gso.products import ProductName, ProductType
from gso.products.product_types.site import Site
SubscriptionType = dict[str, Any]

Mohammad Torkashvand
committed
product_types: list[ProductType],
lifecycles: list[SubscriptionLifecycle] | None = None,
includes: list[str] | None = None,
excludes: list[str] | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions for a specific product type.

Mohammad Torkashvand
committed
:param list[ProductName] product_types: The types of the product for which to retrieve subscriptions.
:param SubscriptionLifecycle lifecycles: The lifecycles that the products must be in.
:param list[str] includes: List of fields to be included in the returned Subscription objects.
:param list[str] excludes: List of fields to be excluded from the returned Subscription objects.
:return: A list of Subscription objects that match the query.
:rtype: list[Subscription]

Mohammad Torkashvand
committed
if not lifecycles:
lifecycles = list(SubscriptionLifecycle)
if not includes:
includes = [col.name for col in SubscriptionTable.__table__.columns]
if excludes:
includes = [field for field in includes if field not in excludes]
dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]
query = SubscriptionTable.query.join(ProductTable).filter(

Mohammad Torkashvand
committed
ProductTable.product_type.in_([str(product_type) for product_type in product_types]),
SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]),
results = query.with_entities(*dynamic_fields).all()
Karel van Klink
committed
return [dict(zip(includes, result, strict=True)) for result in results]
def get_active_site_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for sites.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for sites.
:rtype: list[Subscription]

Mohammad Torkashvand
committed
return get_subscriptions(
product_types=[ProductType.SITE], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
def get_router_subscriptions(
includes: list[str] | None = None, lifecycles: list[SubscriptionLifecycle] | None = None
) -> list[SubscriptionType]:
"""Retrieve subscriptions specifically for routers.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for routers.
:rtype: list[Subscription]
"""
return get_subscriptions(product_types=[ProductType.ROUTER], lifecycles=lifecycles, includes=includes)
def get_active_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for routers.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for routers.
:rtype: list[Subscription]

Mohammad Torkashvand
committed
return get_subscriptions(
product_types=[ProductType.ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
def get_provisioning_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve provisioning subscriptions specifically for routers.
:param list[str] includes: The fields to be included in the returned Subscription objects.
:return list[Subscription]: A list of router Subscription objects.
"""

Mohammad Torkashvand
committed
return get_subscriptions(
product_types=[ProductType.ROUTER], lifecycles=[SubscriptionLifecycle.PROVISIONING], includes=includes
)
def get_active_iptrunk_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
Karel van Klink
committed
"""Retrieve active subscriptions specifically for IP trunks.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for IP trunks.
:rtype: list[Subscription]
"""

Mohammad Torkashvand
committed
return get_subscriptions(
product_types=[ProductType.IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
Karel van Klink
committed
def get_trunks_that_terminate_on_router(
subscription_id: UUIDstr, lifecycle_state: SubscriptionLifecycle
) -> list[SubscriptionTable]:
"""Get all IP trunk subscriptions that terminate on the given ``subscription_id`` of a Router.
Karel van Klink
committed
Given a ``subscription_id`` of a Router subscription, this method gives a list of all IP trunk subscriptions that
terminate on this Router. The given lifecycle state dictates the state of trunk subscriptions that are counted as
terminating on this router.
Karel van Klink
committed
:param UUIDstr subscription_id: Subscription ID of a Router
:param SubscriptionLifecycle lifecycle_state: Required lifecycle state of the IP trunk
Karel van Klink
committed
:return: A list of IP trunk subscriptions
:rtype: list[SubscriptionTable]
"""
return (
query_in_use_by_subscriptions(UUID(subscription_id))
.join(ProductTable)
.filter(

Mohammad Torkashvand
committed
ProductTable.product_type == ProductType.IP_TRUNK,
SubscriptionTable.status == lifecycle_state,
Karel van Klink
committed

Mohammad Torkashvand
committed
def get_product_id_by_name(product_name: ProductName) -> UUID:
"""Retrieve the :term:`UUID` of a product by its name.
:param product_name: The name of the product.

Mohammad Torkashvand
committed
:type product_name: ProductName
:return UUID: The :term:`UUID` of the product.
:rtype: UUID
"""
return ProductTable.query.filter_by(name=product_name).first().product_id
def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]:
"""Retrieve a list of active subscriptions based on a specified field and its value.
:param field_name: The name of the field to filter by.
:type field_name: str
:param field_value: The value of the field to match.
:type field_value: Any
:return: A list of active Subscription objects that match the criteria.
SubscriptionTable.query.join(ProductTable)
.join(SubscriptionInstanceTable)
.join(SubscriptionInstanceValueTable)
.join(ResourceTypeTable)
.filter(SubscriptionInstanceValueTable.value == field_value)
.filter(ResourceTypeTable.resource_type == field_name)
.filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_products processes.
:return: The count of incomplete 'validate_products' processes.
:rtype: int
Karel van Klink
committed
ProcessTable.workflow_name == "validate_products",
ProcessTable.last_status != "completed",
).count()
def get_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently in sync."""
return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()
def get_site_by_name(site_name: str) -> Site:
"""Get a site by its name.
:param site_name: The name of the site.
:type site_name: str
"""
subscription = get_active_subscriptions_by_field_and_value("site_name", site_name)
if not subscription:
msg = f"Site with name {site_name} not found."
raise ValueError(msg)
return Site.from_subscription(subscription[0].subscription_id)