Skip to content
Snippets Groups Projects
Commit 0e984b92 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

add task for partner creation,modification and deletion

parent 854fe158
No related branches found
No related tags found
1 merge request!238add tasks for partner creation, modification and deletion
Showing
with 722 additions and 98 deletions
"""Database model definitions and table mappings for the GSO system."""
import enum
import structlog
from orchestrator.db import UtcTimestamp
from orchestrator.db.database import BaseModel
from sqlalchemy import (
Enum,
String,
text,
)
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.orm import mapped_column
logger = structlog.get_logger(__name__)
class PartnerType(str, enum.Enum):
"""Defining different types of partners in the GSO system."""
NREN = "NREN"
RE_PEER = "RE_PEER"
PUBLIC_PEER = "PUBLIC_PEER"
PRIVATE_PEER = "PRIVATE_PEER"
UPSTREAM = "UPSTREAM"
GEANT = "GEANT"
class PartnerTable(BaseModel):
"""Database table for the partners in the GSO system."""
__tablename__ = "partners"
partner_id = mapped_column(String, server_default=text("uuid_generate_v4"), primary_key=True)
name = mapped_column(String, unique=True, nullable=True)
name = mapped_column(String, unique=True, nullable=False)
email = mapped_column(String, unique=True, nullable=False)
partner_type = mapped_column(Enum(PartnerType), nullable=False)
as_number = mapped_column(
String, unique=True, nullable=True
) # the as_number and as_set are mutually exclusive. if you give me one I don't need the other
as_set = mapped_column(String, nullable=True)
route_set = mapped_column(String, nullable=True)
black_listed_as_sets = mapped_column(ARRAY(String), nullable=True)
additional_routers = mapped_column(ARRAY(String), nullable=True)
additional_bgp_speakers = mapped_column(ARRAY(String), nullable=True)
created_at = mapped_column(UtcTimestamp, server_default=text("current_timestamp"), nullable=False)
updated_at = mapped_column(
......
"""Add task_create_partners, task_modify_partners and task_delete_partners.
Revision ID: 41fd1ae225aq
Revises: 31fd1ae8d5bb
Create Date: 2024-07-29 17:11:00.00000
"""
from uuid import uuid4
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "41fd1ae225aq"
down_revision = "31fd1ae8d5bb"
branch_labels = None
depends_on = None
workflows = [
{"name": "task_create_partners", "description": "Create partners", "workflow_id": uuid4(), "target": "SYSTEM"},
{"name": "task_modify_partners", "description": "Modify partners", "workflow_id": uuid4(), "target": "SYSTEM"},
{"name": "task_delete_partners", "description": "Delete partners", "workflow_id": uuid4(), "target": "SYSTEM"},
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(
sa.text(
"INSERT INTO workflows VALUES (:workflow_id, :name, :target, :description, now()) ON CONFLICT DO NOTHING"
),
workflow,
)
op.alter_column('partners', 'email',
existing_type=sa.String(),
nullable=False)
op.drop_column(
'partners',
'partner_type'
)
op.drop_column(
'partners',
'as_number'
)
op.drop_column(
'partners',
'as_set'
)
op.drop_column(
'partners',
'route_set'
)
op.drop_column(
'partners',
'black_listed_as_sets'
)
op.drop_column(
'partners',
'additional_routers'
)
op.drop_column(
'partners',
'additional_bgp_speakers'
)
def downgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(sa.text("DELETE FROM workflows WHERE name = :name"), {"name": workflow["name"]})
"""It is used to group the schema files together as a package."""
"""Partner schema module."""
from datetime import datetime
from uuid import uuid4
from pydantic import BaseModel, ConfigDict, EmailStr, Field
from gso.db.models import PartnerType
class PartnerCreate(BaseModel):
"""Partner create schema."""
partner_id: str = Field(default_factory=lambda: str(uuid4()))
name: str
email: EmailStr | None = None
as_number: str | None = None
as_set: str | None = None
route_set: str | None = None
black_listed_as_sets: list[str] | None = None
additional_routers: list[str] | None = None
additional_bgp_speakers: list[str] | None = None
partner_type: PartnerType
created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
model_config = ConfigDict(from_attributes=True)
"""A module that returns the partners available in :term:`GSO`."""
from datetime import datetime
from typing import Any
from uuid import uuid4
from orchestrator.db import db
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
from pydantic import BaseModel, ConfigDict, EmailStr, Field
from sqlalchemy import func
from sqlalchemy.exc import NoResultFound
from gso.db.models import PartnerTable
from gso.schema.partner import PartnerCreate
class PartnerSchema(BaseModel):
"""Partner schema."""
partner_id: str = Field(default_factory=lambda: str(uuid4()))
name: str
email: EmailStr
created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
model_config = ConfigDict(from_attributes=True)
class PartnerNotFoundError(Exception):
......@@ -22,46 +37,84 @@ def get_all_partners() -> list[dict]:
def get_partner_by_name(name: str) -> dict[str, Any]:
"""Try to get a partner by their name."""
try:
partner = PartnerTable.query.filter(PartnerTable.name == name).one()
partner = db.session.query(PartnerTable).filter(PartnerTable.name == name).one()
return partner.__json__()
except NoResultFound as e:
msg = f"partner {name} not found"
raise PartnerNotFoundError(msg) from e
def get_partner_by_id(partner_id: str) -> PartnerTable:
"""Try to get a partner by their id."""
partner = db.session.query(PartnerTable).filter_by(partner_id=partner_id).first()
if not partner:
raise PartnerNotFoundError
return partner
def filter_partners_by_attribute(
attribute: str, value: str, *, case_sensitive: bool = True
) -> list[dict[str, Any]] | None:
"""Filter the list of partners by a specified attribute."""
if case_sensitive:
partners = db.session.query(PartnerTable).filter(getattr(PartnerTable, attribute) == value).all()
else:
partners = (
db.session.query(PartnerTable)
.filter(func.lower(getattr(PartnerTable, attribute)) == func.lower(value))
.all()
)
return [partner.__json__() for partner in partners] if partners else None
def filter_partners_by_name(name: str, *, case_sensitive: bool = True) -> list[dict[str, Any]] | None:
"""Filter the list of partners by name."""
return filter_partners_by_attribute("name", name, case_sensitive=case_sensitive)
def filter_partners_by_email(email: str, *, case_sensitive: bool = True) -> list[dict[str, Any]] | None:
"""Filter the list of partners by email."""
return filter_partners_by_attribute("email", email, case_sensitive=case_sensitive)
def create_partner(
partner_data: PartnerCreate,
partner_data: PartnerSchema,
) -> dict:
"""Create a new partner and add it to the database using Pydantic schema for validation.
:param partner_data: Partner data validated by Pydantic schema.
:return: JSON representation of the created partner.
"""
try:
new_partner = PartnerTable(**partner_data.model_dump())
new_partner = PartnerTable(**partner_data.model_dump())
db.session.add(new_partner)
db.session.commit()
db.session.add(new_partner)
db.session.commit()
return new_partner.__json__()
return new_partner.__json__()
except SQLAlchemyError:
db.session.rollback()
raise
finally:
db.session.close()
def edit_partner(
partner_data: PartnerSchema,
) -> PartnerTable:
"""Edit an existing partner and update it in the database."""
partner = get_partner_by_id(partner_id=partner_data.partner_id)
def delete_partner_by_name(name: str) -> None:
"""Delete a partner by their name."""
try:
partner = PartnerTable.query.filter(PartnerTable.name == name).one()
db.session.delete(partner)
db.session.commit()
except NoResultFound as e:
msg = f"partner {name} not found"
raise PartnerNotFoundError(msg) from e
except SQLAlchemyError:
db.session.rollback()
raise
finally:
db.session.close()
if partner_data.name:
partner.name = partner_data.name
if partner_data.email:
partner.email = partner_data.email
partner.updated_at = datetime.now().astimezone()
db.session.commit()
return partner
def delete_partner(partner_id: str) -> None:
"""Delete an existing partner from the database."""
partner = get_partner_by_id(partner_id=partner_id)
db.session.delete(partner)
db.session.commit()
......@@ -15,6 +15,7 @@ from orchestrator.db import (
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
db,
)
from orchestrator.domain import SubscriptionModel
from orchestrator.services.subscriptions import query_in_use_by_subscriptions
......@@ -29,10 +30,11 @@ SubscriptionType = dict[str, Any]
def get_subscriptions(
product_types: list[ProductType],
product_types: list[ProductType] | None = None,
lifecycles: list[SubscriptionLifecycle] | None = None,
includes: list[str] | None = None,
excludes: list[str] | None = None,
partner_id: UUIDstr | None = None,
) -> list[SubscriptionType]:
"""Retrieve active subscriptions for a specific product type.
......@@ -40,13 +42,11 @@ def get_subscriptions(
:param SubscriptionLifecycle lifecycles: The lifecycles that the products must be in.
:param list[str] includes: List of fields to be included in the returned Subscription objects.
:param list[str] excludes: List of fields to be excluded from the returned Subscription objects.
:param UUIDstr partner_id: The partner_id of subscriptions.
:return: A list of Subscription objects that match the query.
:rtype: list[Subscription]
"""
if not lifecycles:
lifecycles = list(SubscriptionLifecycle)
if not includes:
includes = [col.name for col in SubscriptionTable.__table__.columns]
......@@ -55,10 +55,16 @@ def get_subscriptions(
dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]
query = SubscriptionTable.query.join(ProductTable).filter(
ProductTable.product_type.in_([str(product_type) for product_type in product_types]),
SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]),
)
query = db.session.query(SubscriptionTable).join(ProductTable)
if product_types:
query = query.filter(ProductTable.product_type.in_([str(product_type) for product_type in product_types]))
if lifecycles:
query = query.filter(SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]))
if partner_id:
query = query.filter(SubscriptionTable.customer_id == partner_id)
results = query.with_entities(*dynamic_fields).all()
......
......@@ -68,6 +68,9 @@
"validate_iptrunk": "Validate IP Trunk configuration",
"validate_router": "Validate router configuration",
"task_validate_geant_products": "Validation task for GEANT products",
"task_send_email_notifications": "Send email notifications for failed tasks"
"task_send_email_notifications": "Send email notifications for failed tasks",
"task_create_partners": "Create partner task",
"task_modify_partners": "Modify partner task",
"task_delete_partners": "Delete partner task"
}
}
......@@ -69,3 +69,6 @@ LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear"
# Tasks
LazyWorkflowInstance("gso.workflows.tasks.send_email_notifications", "task_send_email_notifications")
LazyWorkflowInstance("gso.workflows.tasks.validate_geant_products", "task_validate_geant_products")
LazyWorkflowInstance("gso.workflows.tasks.create_partners", "task_create_partners")
LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partners")
LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners")
"""A creation workflow that create a partner."""
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr, field_validator
from gso.services.partners import PartnerSchema, create_partner, filter_partners_by_email, filter_partners_by_name
def initial_input_form_generator() -> FormGenerator:
"""Gather input from the user needed for creating a partner."""
class CreatePartnerForm(FormPage):
model_config = ConfigDict(title="Create a Partner")
name: str
email: EmailStr
@field_validator("name")
def validate_name(cls, name: str) -> str:
if filter_partners_by_name(name=name, case_sensitive=False):
msg = "Partner with this name already exists."
raise ValueError(msg)
return name
@field_validator("email")
def validate_email(cls, email: str) -> EmailStr:
email = email.lower()
if filter_partners_by_email(email=email, case_sensitive=False):
msg = "Partner with this email already exists."
raise ValueError(msg)
return email
initial_user_input = yield CreatePartnerForm
return initial_user_input.model_dump()
@step("Save partner information to database")
def save_partner_to_database(
name: str,
email: EmailStr,
) -> State:
"""Save user input as a new partner in database."""
partner = create_partner(
partner_data=PartnerSchema(
name=name,
email=email,
)
)
return {"created_partner": partner}
@workflow(
"Create partners",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def task_create_partners() -> StepList:
"""Create a new Partner."""
return begin >> save_partner_to_database >> done
"""A delete workflow that remove a partner."""
from enum import Enum
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr, field_validator
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import Choice
from gso.services.partners import delete_partner, get_all_partners, get_partner_by_name
from gso.services.subscriptions import get_subscriptions
def initial_input_form_generator() -> FormGenerator:
"""Gather input from the user needed for deleting a partner."""
partners = {}
for partner in get_all_partners():
partners[partner["partner_id"]] = partner["name"]
partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type]
class SelectPartnerForm(FormPage):
model_config = ConfigDict(title="Delete a Partner")
partners: partner_choice # type: ignore[valid-type]
@field_validator("partners")
def validate_partners(cls, value: Enum) -> Enum:
if get_subscriptions(partner_id=str(value)):
msg = "This partner has associated data and cannot be removed."
raise ValueError(msg)
return value
initial_user_input = yield SelectPartnerForm
partner = get_partner_by_name(name=initial_user_input.partners.name)
return {"email": partner["email"], "name": partner["name"], "partner_id": partner["partner_id"]}
@step("Delete partner information to database")
def delete_partner_from_database(
partner_id: UUIDstr,
name: str,
email: EmailStr,
) -> State:
"""Delete a partner from database."""
delete_partner(partner_id=partner_id)
return {"deleted_partner": {"name": name, "email": email, "partner_id": partner_id}}
@workflow(
"Delete partners",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def task_delete_partners() -> StepList:
"""Delete a Partner."""
return begin >> delete_partner_from_database >> done
"""A modification workflow that modifies a partner."""
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr, field_validator
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import Choice
from gso.services.partners import (
PartnerSchema,
edit_partner,
filter_partners_by_email,
filter_partners_by_name,
get_all_partners,
get_partner_by_name,
)
def initial_input_form_generator() -> FormGenerator:
"""Gather input from the user needed for modifying a partner."""
partners = {}
for partner in get_all_partners():
partners[partner["partner_id"]] = partner["name"]
partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type]
class SelectPartnerForm(FormPage):
model_config = ConfigDict(title="Choose a Partner")
partners: partner_choice # type: ignore[valid-type]
initial_user_input = yield SelectPartnerForm
partner = get_partner_by_name(name=initial_user_input.partners.name)
class ModifyPartnerForm(FormPage):
model_config = ConfigDict(title="Modify a Partner")
name: str = partner["name"]
email: EmailStr = partner["email"]
@field_validator("name")
def validate_name(cls, name: str) -> str:
if partner["name"] != name and filter_partners_by_name(name=name, case_sensitive=False):
msg = "Partner with this name already exists."
raise ValueError(msg)
return name
@field_validator("email")
def validate_email(cls, email: str) -> EmailStr:
if partner["email"] != email and filter_partners_by_email(email=email, case_sensitive=False):
msg = "Partner with this email already exists."
raise ValueError(msg)
return email
user_input = yield ModifyPartnerForm
return user_input.model_dump() | {"partner_id": partner["partner_id"]}
@step("Save partner information to database")
def save_partner_to_database(
partner_id: UUIDstr,
name: str,
email: EmailStr,
) -> State:
"""Save modified partner in database."""
partner = edit_partner(
partner_data=PartnerSchema(
partner_id=partner_id,
name=name,
email=email,
)
)
return {"modified_partner": partner}
@workflow(
"Modify partners",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def task_modify_partners() -> StepList:
"""Modify a Partner."""
return begin >> save_partner_to_database >> done
......@@ -32,10 +32,8 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from starlette.testclient import TestClient
from urllib3_mock import Responses
from gso.db.models import PartnerType
from gso.main import init_gso_app
from gso.schema.partner import PartnerCreate
from gso.services.partners import create_partner
from gso.services.partners import PartnerSchema, create_partner
from gso.utils.helpers import LAGMember
from test.fixtures import ( # noqa: F401
iptrunk_side_subscription_factory,
......@@ -268,8 +266,24 @@ def test_client(fastapi_app):
@pytest.fixture(scope="session")
def geant_partner():
return create_partner(PartnerCreate(name="GEANT-TEST", partner_type=PartnerType.GEANT, email="goat-test@geant.org"))
def partner_factory():
def _create_partner(
name: str,
email: str,
) -> dict:
return create_partner(
PartnerSchema(
name=name,
email=email,
)
)
return _create_partner
@pytest.fixture(scope="session")
def geant_partner(partner_factory):
return partner_factory(name="GEANT-TEST", email="goat-test@geant.org")
@pytest.fixture()
......
......@@ -6,12 +6,19 @@ from uuid import uuid4
import pytest
from orchestrator import step, workflow
from orchestrator.config.assignee import Assignee
from orchestrator.db import db
from orchestrator.db import (
ProductTable,
SubscriptionInstanceTable,
SubscriptionInstanceValueTable,
SubscriptionTable,
db,
)
from orchestrator.domain import SubscriptionModel
from orchestrator.types import SubscriptionLifecycle, UUIDstr
from orchestrator.utils.datetime import nowtz
from orchestrator.workflow import done, init, inputstep
from pydantic_forms.core import FormPage
from pydantic_forms.types import FormGenerator
from pydantic_forms.types import FormGenerator, SubscriptionMapping
from pydantic_forms.validators import Choice
from gso.products import ProductName
......@@ -559,3 +566,73 @@ def test_workflow(generic_subscription_1: UUIDstr, generic_product_type_1) -> Ge
with WorkflowInstanceForTests(workflow_for_testing_processes_py, "workflow_for_testing_processes_py") as wf:
yield wf
def create_subscription_for_mapping(
product: ProductTable, mapping: SubscriptionMapping, values: dict[str, Any], **kwargs: Any
) -> SubscriptionTable:
"""Create a subscription in the test coredb for the given subscription_mapping and values.
This function handles optional resource types starting with a ? in the mapping not supplied in the values array.
Args:
product: the ProductTable to create a sub for
mapping: the subscription_mapping belonging to that product
values: a dictionary of keys from the sub_map and their corresponding test values
kwargs: The rest of the arguments
Returns: The conforming subscription.
"""
def build_instance(name, value_mapping):
block = product.find_block_by_name(name)
def build_value(rt, value):
resource_type = block.find_resource_type_by_name(rt)
return SubscriptionInstanceValueTable(resource_type_id=resource_type.resource_type_id, value=value)
return SubscriptionInstanceTable(
product_block_id=block.product_block_id,
values=[
build_value(resource_type, values[value_key]) for (resource_type, value_key) in value_mapping.items()
],
)
# recreate the mapping: leave out the ?keys if no value supplied for them
mapping = {
name: [
{
**{k: value_map[k] for k in value_map if not value_map[k].startswith("?")},
**{
k: value_map[k][1:]
for k in value_map
if value_map[k].startswith("?") and value_map[k][1:] in values
},
}
for value_map in mapping[name]
]
for name in mapping
}
instances = [
build_instance(name, value_mapping)
for (name, value_mappings) in mapping.items()
for value_mapping in value_mappings
]
return create_subscription(instances=instances, product=product, **kwargs)
def create_subscription(**kwargs):
attrs = {
"description": "A subscription.",
"customer_id": kwargs.get("customer_id", "85938c4c-0a11-e511-80d0-005056956c1a"),
"start_date": nowtz(),
"status": "active",
"insync": True,
**kwargs,
}
o = SubscriptionTable(**attrs)
db.session.add(o)
db.session.commit()
return o
import pytest
from pydantic_forms.exceptions import FormValidationError
from gso.services.partners import get_partner_by_name
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_create_partner_success():
result, _, _ = run_workflow(
"task_create_partners",
[
{
"name": "GEANT-TEST-CREATION",
"email": "goat-test-creation@geant.org",
}
],
)
assert_complete(result)
state = extract_state(result)
partner = get_partner_by_name(state["name"])
assert partner["name"] == "GEANT-TEST-CREATION"
assert partner["email"] == "goat-test-creation@geant.org"
@pytest.mark.workflow()
def test_create_partner_with_invalid_input_fails():
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_create_partners",
[
{
"name": "Kenneth Boyle",
"email": "invalid_email",
}
],
)
errors = error.value.errors
email_error = errors[0]
assert email_error["loc"] == ("email",)
assert "valid email address" in email_error["msg"]
def test_create_partner_with_duplicate_name_or_email_fails(partner_factory):
partner_factory(
name="new_name",
email="myemail@gmail.com",
)
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_create_partners",
[
{
"name": "NEW_name",
"email": "myemail@gmail.com",
}
],
)
assert error.value.errors[0]["msg"] == "Partner with this name already exists."
assert error.value.errors[1]["msg"] == "Partner with this email already exists."
from uuid import uuid4
import pytest
from orchestrator.db import ProductTable, db
from pydantic_forms.exceptions import FormValidationError
from sqlalchemy import select
from gso.services.partners import filter_partners_by_name
from test.fixtures import create_subscription_for_mapping
from test.workflows import assert_complete, run_workflow
CORRECT_SUBSCRIPTION = str(uuid4())
def get_one_product(product_name):
return db.session.scalars(select(ProductTable).where(ProductTable.name == product_name)).one()
@pytest.mark.workflow()
def test_delete_partner_success(partner_factory):
partner = partner_factory(
name="new_name",
email="myemail@gmail.com",
)
assert filter_partners_by_name(name="new_name", case_sensitive=False)
result, _, _ = run_workflow(
"task_delete_partners",
[
{"partners": partner["partner_id"]},
],
)
assert_complete(result)
assert filter_partners_by_name(name="new_name", case_sensitive=False) is None
def test_delete_partner_with_associated_data_fails(generic_product_3, partner_factory):
partner = partner_factory(
name="new_name",
email="myemail@gmail.com",
)
subscription_mapping = {"PB_2": [{"rt_3": "info.id", "rt_2": "info2.id"}]}
values = {"info.id": "0", "info2.id": "X"}
product = get_one_product("Product 3")
create_subscription_for_mapping(
product, subscription_mapping, values, subscription_id=CORRECT_SUBSCRIPTION, customer_id=partner["partner_id"]
)
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_delete_partners",
[
{"partners": partner["partner_id"]},
],
)
assert error.value.errors[0]["msg"] == "This partner has associated data and cannot be removed."
import pytest
from pydantic_forms.exceptions import FormValidationError
from gso.services.partners import get_partner_by_id
from test.workflows import assert_complete, run_workflow
@pytest.mark.workflow()
def test_modify_partner_success(partner_factory):
partner = partner_factory(
name="new_name",
email="myemail@gmail.com",
)
result, _, _ = run_workflow(
"task_modify_partners",
[
{"partners": partner["partner_id"]},
{
"name": "GEANT-TEST-CREATION",
"email": "goat-test-creation@geant.org",
},
],
)
assert_complete(result)
partner_db = get_partner_by_id(partner_id=partner["partner_id"])
assert partner_db.name == "GEANT-TEST-CREATION"
assert partner_db.email == "goat-test-creation@geant.org"
@pytest.mark.workflow()
def test_modify_partner_with_same_date_success(partner_factory):
partner = partner_factory(
name="new_name",
email="myemail@gmail.com",
)
result, _, _ = run_workflow(
"task_modify_partners",
[
{"partners": partner["partner_id"]},
{
"name": "new_name",
"email": "myemail@gmail.com",
},
],
)
assert_complete(result)
partner_db = get_partner_by_id(partner_id=partner["partner_id"])
assert partner_db.name == "new_name"
assert partner_db.email == "myemail@gmail.com"
@pytest.mark.workflow()
def test_modify_partner_with_duplicate_name_or_email_fails(partner_factory):
partner_factory(
name="new_name",
email="myemail@gmail.com",
)
partner_2 = partner_factory(
name="new_name_2",
email="myemail2@gmail.com",
)
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_modify_partners",
[
{"partners": partner_2["partner_id"]},
{
"name": "new_name",
"email": "myemail@gmail.com",
},
],
)
assert error.value.errors[0]["msg"] == "Partner with this name already exists."
assert error.value.errors[1]["msg"] == "Partner with this email already exists."
@pytest.mark.workflow()
def test_modify_partner_with_invalid_input_fails(partner_factory):
partner = partner_factory(
name="new_name_2",
email="myemail2@gmail.com",
)
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_modify_partners",
[
{"partners": partner["partner_id"]},
{
"name": "Kenneth Boyle",
"email": "invalid_email",
},
],
)
errors = error.value.errors
email_error = errors[0]
assert email_error["loc"] == ("email",)
assert "valid email address" in email_error["msg"]
......@@ -4,7 +4,7 @@ from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_task_validate_geant_products(responses, faker):
def test_task_validate_geant_products(responses):
result, _, _ = run_workflow("task_validate_geant_products", [{}])
assert_complete(result)
state = extract_state(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment