-
Karel van Klink authoredKarel van Klink authored
delete_partners.py 2.11 KiB
"""A delete workflow that remove a partner."""
from enum import Enum
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr, field_validator
from pydantic_forms.validators import Choice
from gso.services.partners import delete_partner, get_all_partners, get_partner_by_name
from gso.services.subscriptions import get_subscriptions
def initial_input_form_generator() -> FormGenerator:
"""Gather input from the user needed for deleting a partner."""
partners = {}
for partner in get_all_partners():
partners[partner["partner_id"]] = partner["name"]
partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type]
class SelectPartnerForm(FormPage):
model_config = ConfigDict(title="Delete a Partner")
partners: partner_choice # type: ignore[valid-type]
@field_validator("partners")
def validate_partners(cls, value: Enum) -> Enum:
if get_subscriptions(partner_id=str(value)):
msg = "This partner has associated data and cannot be removed."
raise ValueError(msg)
return value
initial_user_input = yield SelectPartnerForm
partner = get_partner_by_name(name=initial_user_input.partners.name)
return {"email": partner["email"], "name": partner["name"], "partner_id": partner["partner_id"]}
@step("Delete partner information from database")
def delete_partner_from_database(
partner_id: UUIDstr,
name: str,
email: EmailStr,
) -> State:
"""Delete a partner from database."""
delete_partner(partner_id=partner_id)
return {"deleted_partner": {"name": name, "email": email, "partner_id": partner_id}}
@workflow(
"Delete partners",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def task_delete_partners() -> StepList:
"""Delete a Partner."""
return begin >> delete_partner_from_database >> done