Skip to content
Snippets Groups Projects
Commit 7a998401 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

add a task for partner creation

parent 01016f7a
No related branches found
No related tags found
No related merge requests found
Pipeline #88128 passed
This commit is part of merge request !238. Comments created here will be created in the context of that merge request.
......@@ -33,7 +33,7 @@ class PartnerTable(BaseModel):
__tablename__ = "partners"
partner_id = mapped_column(String, server_default=text("uuid_generate_v4"), primary_key=True)
name = mapped_column(String, unique=True, nullable=True)
name = mapped_column(String, unique=True, nullable=False)
email = mapped_column(String, unique=True, nullable=False)
partner_type = mapped_column(Enum(PartnerType), nullable=False)
......
"""Add task_validate_geant_products.
Revision ID: 41fd1ae225aq
Revises: 31fd1ae8d5bb
Create Date: 2024-07-29 17:11:00.00000
"""
from uuid import uuid4
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "41fd1ae225aq"
down_revision = "31fd1ae8d5bb"
branch_labels = None
depends_on = None
workflows = [
{"name": "task_create_partners", "description": "Create partners", "workflow_id": uuid4(), "target": "SYSTEM"},
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(
sa.text(
"INSERT INTO workflows VALUES (:workflow_id, :name, :target, :description, now()) ON CONFLICT DO NOTHING"
),
workflow,
)
op.alter_column('partners', 'email',
existing_type=sa.String(),
nullable=False)
def downgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(sa.text("DELETE FROM workflows WHERE name = :name"), {"name": workflow["name"]})
"""It is used to group the schema files together as a package."""
"""Partner schema module."""
from datetime import datetime
from uuid import uuid4
from pydantic import BaseModel, ConfigDict, EmailStr, Field
from gso.db.models import PartnerType
class PartnerCreate(BaseModel):
"""Partner create schema."""
partner_id: str = Field(default_factory=lambda: str(uuid4()))
name: str
email: EmailStr | None = None
as_number: str | None = None
as_set: str | None = None
route_set: str | None = None
black_listed_as_sets: list[str] | None = None
additional_routers: list[str] | None = None
additional_bgp_speakers: list[str] | None = None
partner_type: PartnerType
created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
model_config = ConfigDict(from_attributes=True)
"""A module that returns the partners available in :term:`GSO`."""
from datetime import datetime
from typing import Any
from uuid import uuid4
from orchestrator.db import db
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator
from sqlalchemy.exc import NoResultFound
from gso.db.models import PartnerTable
from gso.schema.partner import PartnerCreate
from gso.db.models import PartnerTable, PartnerType
class PartnerCreate(BaseModel):
"""Partner create schema."""
partner_id: str = Field(default_factory=lambda: str(uuid4()))
name: str
email: EmailStr
partner_type: PartnerType
as_number: str | None = None
as_set: str | None = None
route_set: str | None = None
black_listed_as_sets: list[str] | None = None
additional_routers: list[str] | None = None
additional_bgp_speakers: list[str] | None = None
created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
model_config = ConfigDict(from_attributes=True)
@field_validator("name")
def validate_name(cls, name: str) -> str:
"""Validate name for duplication."""
if filter_partners_by_name(name=name):
msg = "Partner with this name already exists."
raise ValueError(msg)
return name
@field_validator("email")
def validate_email(cls, email: str) -> EmailStr:
"""Validate email input."""
if filter_partners_by_email(email=email):
msg = "Partner with this email already exists."
raise ValueError(msg)
return email
class PartnerNotFoundError(Exception):
......@@ -22,13 +61,25 @@ def get_all_partners() -> list[dict]:
def get_partner_by_name(name: str) -> dict[str, Any]:
"""Try to get a partner by their name."""
try:
partner = PartnerTable.query.filter(PartnerTable.name == name).one()
partner = db.session.query(PartnerTable).filter(PartnerTable.name == name).one()
return partner.__json__()
except NoResultFound as e:
msg = f"partner {name} not found"
raise PartnerNotFoundError(msg) from e
def filter_partners_by_name(name: str) -> list[dict[str, Any]] | None:
"""Filter the list of partners by name."""
partners = db.session.query(PartnerTable).filter(PartnerTable.name == name).all()
return [partner.__json__() for partner in partners] if partners else None
def filter_partners_by_email(email: str) -> list[dict[str, Any]] | None:
"""Filter the list of partners by email."""
partners = db.session.query(PartnerTable).filter(PartnerTable.email == email).all()
return [partner.__json__() for partner in partners] if partners else None
def create_partner(
partner_data: PartnerCreate,
) -> dict:
......@@ -37,31 +88,8 @@ def create_partner(
:param partner_data: Partner data validated by Pydantic schema.
:return: JSON representation of the created partner.
"""
try:
new_partner = PartnerTable(**partner_data.model_dump())
db.session.add(new_partner)
db.session.commit()
return new_partner.__json__()
except SQLAlchemyError:
db.session.rollback()
raise
finally:
db.session.close()
new_partner = PartnerTable(**partner_data.model_dump())
db.session.add(new_partner)
db.session.commit()
def delete_partner_by_name(name: str) -> None:
"""Delete a partner by their name."""
try:
partner = PartnerTable.query.filter(PartnerTable.name == name).one()
db.session.delete(partner)
db.session.commit()
except NoResultFound as e:
msg = f"partner {name} not found"
raise PartnerNotFoundError(msg) from e
except SQLAlchemyError:
db.session.rollback()
raise
finally:
db.session.close()
return new_partner.__json__()
......@@ -67,6 +67,7 @@
"import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear",
"validate_iptrunk": "Validate IP Trunk configuration",
"validate_router": "Validate router configuration",
"task_validate_geant_products": "Validation task for GEANT products"
"task_validate_geant_products": "Validation task for GEANT products",
"task_create_partners": "Create partner task"
}
}
......@@ -68,3 +68,4 @@ LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear"
# Tasks
LazyWorkflowInstance("gso.workflows.tasks.validate_geant_products", "task_validate_geant_products")
LazyWorkflowInstance("gso.workflows.tasks.create_partners", "task_create_partners")
"""A creation workflow that create a partner."""
from typing import Annotated
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import AfterValidator, ConfigDict, EmailStr, field_validator
from pydantic_forms.validators import validate_unique_list
from gso.db.models import PartnerType
from gso.services.partners import PartnerCreate, create_partner, filter_partners_by_email, filter_partners_by_name
UniqueStringList = Annotated[
list[str],
AfterValidator(validate_unique_list),
]
def initial_input_form_generator() -> FormGenerator:
"""Gather input from the user needed for creating a partner."""
class CreatePartnerForm(FormPage):
model_config = ConfigDict(title="Create a Partner")
name: str
email: EmailStr
partner_type: PartnerType
as_number: str | None = None
as_set: str | None = None
route_set: str | None = None
black_listed_as_sets: UniqueStringList
additional_routers: UniqueStringList
additional_bgp_speakers: UniqueStringList
@field_validator("name")
def validate_name(cls, name: str) -> str:
if filter_partners_by_name(name=name):
msg = "Partner with this name already exists."
raise ValueError(msg)
return name
@field_validator("email")
def validate_email(cls, email: str) -> EmailStr:
if filter_partners_by_email(email=email):
msg = "Partner with this email already exists."
raise ValueError(msg)
return email
initial_user_input = yield CreatePartnerForm
return initial_user_input.model_dump()
@step("Save partner information to database")
def save_partner_to_database(
name: str,
partner_type: PartnerType,
email: EmailStr,
as_number: str | None,
as_set: str | None,
route_set: str | None,
black_listed_as_sets: UniqueStringList | None,
additional_routers: UniqueStringList | None,
additional_bgp_speakers: UniqueStringList | None,
) -> State:
"""Save user input as a new partner in database."""
partner = create_partner(
partner_data=PartnerCreate(
name=name,
email=email,
partner_type=partner_type,
as_number=as_number,
as_set=as_set,
route_set=route_set,
black_listed_as_sets=black_listed_as_sets,
additional_routers=additional_routers,
additional_bgp_speakers=additional_bgp_speakers,
)
)
return {"created_partner": partner}
@workflow(
"Create partners",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def task_create_partners() -> StepList:
"""Create a new Partner."""
return begin >> save_partner_to_database >> done
......@@ -33,8 +33,7 @@ from starlette.testclient import TestClient
from gso.db.models import PartnerType
from gso.main import init_gso_app
from gso.schema.partner import PartnerCreate
from gso.services.partners import create_partner
from gso.services.partners import PartnerCreate, create_partner
from gso.utils.helpers import LAGMember
from test.fixtures import ( # noqa: F401
iptrunk_side_subscription_factory,
......@@ -271,6 +270,36 @@ def geant_partner():
return create_partner(PartnerCreate(name="GEANT-TEST", partner_type=PartnerType.GEANT, email="goat-test@geant.org"))
@pytest.fixture(scope="session")
def partner_factory():
def _create_partner(
name: str,
email: str,
partner_type: PartnerType,
as_number: str | None = None,
as_set: str | None = None,
rout_set: str | None = None,
black_listed_as_set: list[str] | None = None,
additional_routers: list[str] | None = None,
additional_bgp_speakers: list[str] | None = None,
) -> dict:
return create_partner(
PartnerCreate(
name=name,
email=email,
partner_type=partner_type,
as_number=as_number,
as_set=as_set,
rout_set=rout_set,
black_listed_as_set=black_listed_as_set,
additional_routers=additional_routers,
additional_bgp_speakers=additional_bgp_speakers,
)
)
return _create_partner
@pytest.fixture()
def generic_resource_type_1():
rt = ResourceTypeTable(description="Resource Type one", resource_type="rt_1")
......
import pytest
from pydantic_forms.exceptions import FormValidationError
from gso.db.models import PartnerType
from gso.services.partners import get_partner_by_name
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_create_partner_with_minimal_input(responses):
result, _, _ = run_workflow(
"task_create_partners",
[
{
"name": "GEANT-TEST-CREATION",
"email": "goat-test-creation@geant.org",
"partner_type": PartnerType.GEANT,
"black_listed_as_sets": [],
"additional_routers": [],
"additional_bgp_speakers": [],
}
],
)
assert_complete(result)
state = extract_state(result)
partner = get_partner_by_name(state["name"])
assert partner["name"] == "GEANT-TEST-CREATION"
assert partner["email"] == "goat-test-creation@geant.org"
assert partner["partner_type"] == PartnerType.GEANT
assert partner["as_number"] is None
assert partner["as_set"] is None
assert partner["route_set"] is None
assert partner["black_listed_as_sets"] == []
assert partner["additional_routers"] == []
assert partner["additional_bgp_speakers"] == []
@pytest.mark.workflow()
def test_create_partner_with_full_input(responses):
result, _, _ = run_workflow(
"task_create_partners",
[
{
"name": "GEANT-TEST-CREATION",
"email": "goat-test-creation@geant.org",
"partner_type": PartnerType.GEANT,
"as_number": "1212",
"as_set": "1212",
"route_set": "1212",
"black_listed_as_sets": ["a"],
"additional_routers": ["b"],
"additional_bgp_speakers": ["c"],
}
],
)
assert_complete(result)
state = extract_state(result)
partner = get_partner_by_name(state["name"])
assert partner["name"] == "GEANT-TEST-CREATION"
assert partner["email"] == "goat-test-creation@geant.org"
assert partner["partner_type"] == PartnerType.GEANT
assert partner["as_number"] == "1212"
assert partner["as_set"] == "1212"
assert partner["route_set"] == "1212"
assert partner["black_listed_as_sets"] == ["a"]
assert partner["additional_routers"] == ["b"]
assert partner["additional_bgp_speakers"] == ["c"]
@pytest.mark.workflow()
def test_create_partner_with_invalid_input_fails(responses, faker):
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_create_partners",
[
{
"name": "Kenneth Boyle",
"email": "invalid_email",
"black_listed_as_sets": ["a", "a"],
"additional_routers": ["b", "b"],
"additional_bgp_speakers": ["c", "c", "c"],
}
],
)
errors = error.value.errors
# Check the email error
email_error = errors[0]
assert email_error["loc"] == ("email",)
assert "valid email address" in email_error["msg"]
# Check the partner_type error
partner_type_error = errors[1]
assert partner_type_error["loc"] == ("partner_type",)
assert partner_type_error["msg"] == "Field required"
# Check the unique_list errors
unique_list_errors = errors[2:]
for unique_error, field in zip(
unique_list_errors, ["black_listed_as_sets", "additional_routers", "additional_bgp_speakers"], strict=True
):
assert unique_error["type"] == "unique_list"
assert unique_error["loc"] == (field,)
assert unique_error["msg"] == "List must be unique"
def test_create_partner_with_duplicate_name_or_email_fails(partner_factory):
partner_factory(
name="new_name",
email="myemail@gmail.com",
partner_type=PartnerType.PRIVATE_PEER,
)
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_create_partners",
[
{
"name": "new_name",
"email": "myemail@gmail.com",
"black_listed_as_sets": [],
"additional_routers": [],
"additional_bgp_speakers": [],
}
],
)
assert error.value.errors[0]["msg"] == "Partner with this name already exists."
assert error.value.errors[1]["msg"] == "Partner with this email already exists."
......@@ -4,7 +4,7 @@ from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_task_validate_geant_products(responses, faker):
def test_task_validate_geant_products(responses):
result, _, _ = run_workflow("task_validate_geant_products", [{}])
assert_complete(result)
state = extract_state(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment