-
Mohammad Torkashvand authoredMohammad Torkashvand authored
subscriptions.py 9.33 KiB
"""A collection of methods that make interaction with coreDB more straight-forward.
This prevents someone from having to re-write database statements many times, that might turn out to be erroneous
or inconsistent when not careful.
"""
from typing import Any
from uuid import UUID
from orchestrator.db import (
ProcessSubscriptionTable,
ProcessTable,
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
db,
)
from orchestrator.domain import SubscriptionModel
from orchestrator.services.subscriptions import query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle
from orchestrator.workflow import ProcessStatus
from pydantic_forms.types import UUIDstr
from gso.products import ProductName, ProductType
from gso.products.product_types.site import Site
SubscriptionType = dict[str, Any]
def get_subscriptions(
product_types: list[ProductType] | None = None,
lifecycles: list[SubscriptionLifecycle] | None = None,
includes: list[str] | None = None,
excludes: list[str] | None = None,
partner_id: UUIDstr | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions for a specific product type.
:param list[ProductName] product_types: The types of the product for which to retrieve subscriptions.
:param SubscriptionLifecycle lifecycles: The lifecycles that the products must be in.
:param list[str] includes: List of fields to be included in the returned Subscription objects.
:param list[str] excludes: List of fields to be excluded from the returned Subscription objects.
:param UUIDstr partner_id: The customer id of subscriptions.
:return: A list of Subscription objects that match the query.
:rtype: list[Subscription]
"""
if not includes:
includes = [col.name for col in SubscriptionTable.__table__.columns]
if excludes:
includes = [field for field in includes if field not in excludes]
dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]
query = db.session.query(SubscriptionTable).join(ProductTable)
if product_types:
query = query.filter(ProductTable.product_type.in_([str(product_type) for product_type in product_types]))
if lifecycles:
query = query.filter(SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]))
if partner_id:
query = query.filter(SubscriptionTable.customer_id == partner_id)
results = query.with_entities(*dynamic_fields).all()
return [dict(zip(includes, result, strict=True)) for result in results]
def get_active_site_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for sites.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for sites.
:rtype: list[Subscription]
"""
return get_subscriptions(
product_types=[ProductType.SITE], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
def get_router_subscriptions(
includes: list[str] | None = None, lifecycles: list[SubscriptionLifecycle] | None = None
) -> list[SubscriptionType]:
"""Retrieve subscriptions specifically for routers.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for routers.
:rtype: list[Subscription]
"""
return get_subscriptions(product_types=[ProductType.ROUTER], lifecycles=lifecycles, includes=includes)
def get_active_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for routers.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for routers.
:rtype: list[Subscription]
"""
return get_subscriptions(
product_types=[ProductType.ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
def get_provisioning_router_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve provisioning subscriptions specifically for routers.
:param list[str] includes: The fields to be included in the returned Subscription objects.
:return list[Subscription]: A list of router Subscription objects.
"""
return get_subscriptions(
product_types=[ProductType.ROUTER], lifecycles=[SubscriptionLifecycle.PROVISIONING], includes=includes
)
def get_active_iptrunk_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active subscriptions specifically for IP trunks.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for IP trunks.
:rtype: list[Subscription]
"""
return get_subscriptions(
product_types=[ProductType.IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
def get_trunks_that_terminate_on_router(
subscription_id: UUIDstr, lifecycle_state: SubscriptionLifecycle
) -> list[SubscriptionTable]:
"""Get all IP trunk subscriptions that terminate on the given ``subscription_id`` of a Router.
Given a ``subscription_id`` of a Router subscription, this method gives a list of all IP trunk subscriptions that
terminate on this Router. The given lifecycle state dictates the state of trunk subscriptions that are counted as
terminating on this router.
:param UUIDstr subscription_id: Subscription ID of a Router
:param SubscriptionLifecycle lifecycle_state: Required lifecycle state of the IP trunk
:return: A list of IP trunk subscriptions
:rtype: list[SubscriptionTable]
"""
return (
query_in_use_by_subscriptions(UUID(subscription_id))
.join(ProductTable)
.filter(
ProductTable.product_type == ProductType.IP_TRUNK,
SubscriptionTable.status == lifecycle_state,
)
.all()
)
def get_product_id_by_name(product_name: ProductName) -> UUID:
"""Retrieve the :term:`UUID` of a product by its name.
:param product_name: The name of the product.
:type product_name: ProductName
:return UUID: The :term:`UUID` of the product.
:rtype: UUID
"""
return ProductTable.query.filter_by(name=product_name).first().product_id
def get_active_subscriptions_by_field_and_value(field_name: str, field_value: str) -> list[SubscriptionTable]:
"""Retrieve a list of active subscriptions based on a specified field and its value.
:param field_name: The name of the field to filter by.
:type field_name: str
:param field_value: The value of the field to match.
:type field_value: Any
:return: A list of active Subscription objects that match the criteria.
:rtype: List[SubscriptionTable]
"""
return (
SubscriptionTable.query.join(ProductTable)
.join(SubscriptionInstanceTable)
.join(SubscriptionInstanceValueTable)
.join(ResourceTypeTable)
.filter(SubscriptionInstanceValueTable.value == field_value)
.filter(ResourceTypeTable.resource_type == field_name)
.filter(SubscriptionTable.status == SubscriptionLifecycle.ACTIVE)
.all()
)
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_geant_products processes.
:return: The count of incomplete 'validate_geant_products' processes.
:rtype: int
"""
return ProcessTable.query.filter(
ProcessTable.workflow_name == "validate_geant_products",
ProcessTable.last_status != ProcessStatus.COMPLETED.value,
).count()
def get_failed_tasks() -> list[ProcessTable]:
"""Get all tasks that have failed."""
return ProcessTable.query.filter(
ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value
).all()
def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None:
"""Get a subscription from a process ID."""
subscription_table = ProcessSubscriptionTable.query.filter(
ProcessSubscriptionTable.process_id == process_id
).first()
return SubscriptionModel.from_subscription(subscription_table.subscription_id) if subscription_table else None
def get_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently in sync."""
return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()
def get_active_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently active and in sync."""
return (
SubscriptionTable.query.join(ProductTable)
.filter(SubscriptionTable.insync.is_(True), SubscriptionTable.status == SubscriptionLifecycle.ACTIVE.value)
.all()
)
def get_site_by_name(site_name: str) -> Site:
"""Get a site by its name.
:param site_name: The name of the site.
:type site_name: str
"""
subscription = get_active_subscriptions_by_field_and_value("site_name", site_name)
if not subscription:
msg = f"Site with name {site_name} not found."
raise ValueError(msg)
return Site.from_subscription(subscription[0].subscription_id)