Skip to content
Snippets Groups Projects
Commit 5c65d696 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

add task for partner creation,modification and deletion

parent 01016f7a
No related branches found
No related tags found
No related merge requests found
Pipeline #88141 failed
Showing
with 603 additions and 88 deletions
"""Database model definitions and table mappings for the GSO system.""" """Database model definitions and table mappings for the GSO system."""
import enum
import structlog import structlog
from orchestrator.db import UtcTimestamp from orchestrator.db import UtcTimestamp
from orchestrator.db.database import BaseModel from orchestrator.db.database import BaseModel
from sqlalchemy import ( from sqlalchemy import (
Enum,
String, String,
text, text,
) )
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.orm import mapped_column from sqlalchemy.orm import mapped_column
logger = structlog.get_logger(__name__) logger = structlog.get_logger(__name__)
class PartnerType(str, enum.Enum):
"""Defining different types of partners in the GSO system."""
NREN = "NREN"
RE_PEER = "RE_PEER"
PUBLIC_PEER = "PUBLIC_PEER"
PRIVATE_PEER = "PRIVATE_PEER"
UPSTREAM = "UPSTREAM"
GEANT = "GEANT"
class PartnerTable(BaseModel): class PartnerTable(BaseModel):
"""Database table for the partners in the GSO system.""" """Database table for the partners in the GSO system."""
__tablename__ = "partners" __tablename__ = "partners"
partner_id = mapped_column(String, server_default=text("uuid_generate_v4"), primary_key=True) partner_id = mapped_column(String, server_default=text("uuid_generate_v4"), primary_key=True)
name = mapped_column(String, unique=True, nullable=True) name = mapped_column(String, unique=True, nullable=False)
email = mapped_column(String, unique=True, nullable=False) email = mapped_column(String, unique=True, nullable=False)
partner_type = mapped_column(Enum(PartnerType), nullable=False)
as_number = mapped_column(
String, unique=True, nullable=True
) # the as_number and as_set are mutually exclusive. if you give me one I don't need the other
as_set = mapped_column(String, nullable=True)
route_set = mapped_column(String, nullable=True)
black_listed_as_sets = mapped_column(ARRAY(String), nullable=True)
additional_routers = mapped_column(ARRAY(String), nullable=True)
additional_bgp_speakers = mapped_column(ARRAY(String), nullable=True)
created_at = mapped_column(UtcTimestamp, server_default=text("current_timestamp"), nullable=False) created_at = mapped_column(UtcTimestamp, server_default=text("current_timestamp"), nullable=False)
updated_at = mapped_column( updated_at = mapped_column(
......
"""Add task_validate_geant_products.
Revision ID: 41fd1ae225aq
Revises: 31fd1ae8d5bb
Create Date: 2024-07-29 17:11:00.00000
"""
from uuid import uuid4
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "41fd1ae225aq"
down_revision = "31fd1ae8d5bb"
branch_labels = None
depends_on = None
workflows = [
{"name": "task_create_partners", "description": "Create partners", "workflow_id": uuid4(), "target": "SYSTEM"},
{"name": "task_modify_partners", "description": "Modify partners", "workflow_id": uuid4(), "target": "SYSTEM"},
{"name": "task_delete_partners", "description": "Delete partners", "workflow_id": uuid4(), "target": "SYSTEM"},
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(
sa.text(
"INSERT INTO workflows VALUES (:workflow_id, :name, :target, :description, now()) ON CONFLICT DO NOTHING"
),
workflow,
)
op.alter_column('partners', 'email',
existing_type=sa.String(),
nullable=False)
op.drop_column(
'partners',
'partner_type'
)
op.drop_column(
'partners',
'as_number'
)
op.drop_column(
'partners',
'as_set'
)
op.drop_column(
'partners',
'route_set'
)
op.drop_column(
'partners',
'black_listed_as_sets'
)
op.drop_column(
'partners',
'additional_routers'
)
op.drop_column(
'partners',
'additional_bgp_speakers'
)
def downgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(sa.text("DELETE FROM workflows WHERE name = :name"), {"name": workflow["name"]})
"""It is used to group the schema files together as a package."""
"""Partner schema module."""
from datetime import datetime
from uuid import uuid4
from pydantic import BaseModel, ConfigDict, EmailStr, Field
from gso.db.models import PartnerType
class PartnerCreate(BaseModel):
"""Partner create schema."""
partner_id: str = Field(default_factory=lambda: str(uuid4()))
name: str
email: EmailStr | None = None
as_number: str | None = None
as_set: str | None = None
route_set: str | None = None
black_listed_as_sets: list[str] | None = None
additional_routers: list[str] | None = None
additional_bgp_speakers: list[str] | None = None
partner_type: PartnerType
created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
model_config = ConfigDict(from_attributes=True)
"""A module that returns the partners available in :term:`GSO`.""" """A module that returns the partners available in :term:`GSO`."""
from datetime import datetime
from typing import Any from typing import Any
from uuid import uuid4
from orchestrator.db import db from orchestrator.db import db
from sqlalchemy.exc import NoResultFound, SQLAlchemyError from pydantic import BaseModel, ConfigDict, EmailStr, Field
from sqlalchemy import func
from sqlalchemy.exc import NoResultFound
from gso.db.models import PartnerTable from gso.db.models import PartnerTable
from gso.schema.partner import PartnerCreate
class PartnerSchema(BaseModel):
"""Partner schema."""
partner_id: str = Field(default_factory=lambda: str(uuid4()))
name: str
email: EmailStr
created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
model_config = ConfigDict(from_attributes=True)
class PartnerNotFoundError(Exception): class PartnerNotFoundError(Exception):
...@@ -22,46 +37,84 @@ def get_all_partners() -> list[dict]: ...@@ -22,46 +37,84 @@ def get_all_partners() -> list[dict]:
def get_partner_by_name(name: str) -> dict[str, Any]: def get_partner_by_name(name: str) -> dict[str, Any]:
"""Try to get a partner by their name.""" """Try to get a partner by their name."""
try: try:
partner = PartnerTable.query.filter(PartnerTable.name == name).one() partner = db.session.query(PartnerTable).filter(PartnerTable.name == name).one()
return partner.__json__() return partner.__json__()
except NoResultFound as e: except NoResultFound as e:
msg = f"partner {name} not found" msg = f"partner {name} not found"
raise PartnerNotFoundError(msg) from e raise PartnerNotFoundError(msg) from e
def get_partner_by_id(partner_id: str) -> PartnerTable:
"""Try to get a partner by their id."""
partner = db.session.query(PartnerTable).filter_by(partner_id=partner_id).first()
if not partner:
raise PartnerNotFoundError
return partner
def filter_partners_by_attribute(
attribute: str, value: str, *, case_sensitive: bool = True
) -> list[dict[str, Any]] | None:
"""Filter the list of partners by a specified attribute."""
if case_sensitive:
partners = db.session.query(PartnerTable).filter(getattr(PartnerTable, attribute) == value).all()
else:
partners = (
db.session.query(PartnerTable)
.filter(func.lower(getattr(PartnerTable, attribute)) == func.lower(value))
.all()
)
return [partner.__json__() for partner in partners] if partners else None
def filter_partners_by_name(name: str, *, case_sensitive: bool = True) -> list[dict[str, Any]] | None:
"""Filter the list of partners by name."""
return filter_partners_by_attribute("name", name, case_sensitive=case_sensitive)
def filter_partners_by_email(email: str, *, case_sensitive: bool = True) -> list[dict[str, Any]] | None:
"""Filter the list of partners by email."""
return filter_partners_by_attribute("email", email, case_sensitive=case_sensitive)
def create_partner( def create_partner(
partner_data: PartnerCreate, partner_data: PartnerSchema,
) -> dict: ) -> dict:
"""Create a new partner and add it to the database using Pydantic schema for validation. """Create a new partner and add it to the database using Pydantic schema for validation.
:param partner_data: Partner data validated by Pydantic schema. :param partner_data: Partner data validated by Pydantic schema.
:return: JSON representation of the created partner. :return: JSON representation of the created partner.
""" """
try: new_partner = PartnerTable(**partner_data.model_dump())
new_partner = PartnerTable(**partner_data.model_dump()) db.session.add(new_partner)
db.session.commit()
db.session.add(new_partner) return new_partner.__json__()
db.session.commit()
return new_partner.__json__()
except SQLAlchemyError:
db.session.rollback()
raise
finally:
db.session.close()
def edit_partner(
partner_data: PartnerSchema,
) -> PartnerTable:
"""Edit an existing partner and update it in the database."""
partner = get_partner_by_id(partner_id=partner_data.partner_id)
def delete_partner_by_name(name: str) -> None: if partner_data.name:
"""Delete a partner by their name.""" partner.name = partner_data.name
try: if partner_data.email:
partner = PartnerTable.query.filter(PartnerTable.name == name).one() partner.email = partner_data.email
db.session.delete(partner)
db.session.commit() partner.updated_at = datetime.now().astimezone()
except NoResultFound as e:
msg = f"partner {name} not found" db.session.commit()
raise PartnerNotFoundError(msg) from e
except SQLAlchemyError: return partner
db.session.rollback()
raise
finally: def delete_partner(partner_id: str) -> None:
db.session.close() """Delete an existing partner from the database."""
partner = get_partner_by_id(partner_id=partner_id)
db.session.delete(partner)
db.session.commit()
...@@ -67,6 +67,9 @@ ...@@ -67,6 +67,9 @@
"import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear", "import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear",
"validate_iptrunk": "Validate IP Trunk configuration", "validate_iptrunk": "Validate IP Trunk configuration",
"validate_router": "Validate router configuration", "validate_router": "Validate router configuration",
"task_validate_geant_products": "Validation task for GEANT products" "task_validate_geant_products": "Validation task for GEANT products",
"task_create_partners": "Create partner task",
"task_modify_partners": "Modify partner task",
"task_delete_partners": "Delete partner task"
} }
} }
...@@ -68,3 +68,6 @@ LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear" ...@@ -68,3 +68,6 @@ LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear"
# Tasks # Tasks
LazyWorkflowInstance("gso.workflows.tasks.validate_geant_products", "task_validate_geant_products") LazyWorkflowInstance("gso.workflows.tasks.validate_geant_products", "task_validate_geant_products")
LazyWorkflowInstance("gso.workflows.tasks.create_partners", "task_create_partners")
LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partners")
LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners")
"""A creation workflow that create a partner."""
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr, field_validator
from gso.services.partners import PartnerSchema, create_partner, filter_partners_by_email, filter_partners_by_name
def initial_input_form_generator() -> FormGenerator:
"""Gather input from the user needed for creating a partner."""
class CreatePartnerForm(FormPage):
model_config = ConfigDict(title="Create a Partner")
name: str
email: EmailStr
@field_validator("name")
def validate_name(cls, name: str) -> str:
if filter_partners_by_name(name=name, case_sensitive=False):
msg = "Partner with this name already exists."
raise ValueError(msg)
return name
@field_validator("email")
def validate_email(cls, email: str) -> EmailStr:
email = email.lower()
if filter_partners_by_email(email=email, case_sensitive=False):
msg = "Partner with this email already exists."
raise ValueError(msg)
return email
initial_user_input = yield CreatePartnerForm
return initial_user_input.model_dump()
@step("Save partner information to database")
def save_partner_to_database(
name: str,
email: EmailStr,
) -> State:
"""Save user input as a new partner in database."""
partner = create_partner(
partner_data=PartnerSchema(
name=name,
email=email,
)
)
return {"created_partner": partner}
@workflow(
"Create partners",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def task_create_partners() -> StepList:
"""Create a new Partner."""
return begin >> save_partner_to_database >> done
"""A modification workflow that edit a partner."""
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import Choice
from gso.services.partners import delete_partner, get_all_partners, get_partner_by_name
def initial_input_form_generator() -> FormGenerator:
"""Gather input from the user needed for deleting a partner."""
partners = {}
for partner in get_all_partners():
partners[partner["partner_id"]] = partner["name"]
partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type]
class SelectPartnerForm(FormPage):
model_config = ConfigDict(title="Delete a Partner")
partners: partner_choice # type: ignore[valid-type]
initial_user_input = yield SelectPartnerForm
partner = get_partner_by_name(name=initial_user_input.partners.name)
return {"email": partner["email"], "name": partner["name"], "partner_id": partner["partner_id"]}
@step("Delete partner information to database")
def delete_partner_from_database(
partner_id: UUIDstr,
name: str,
email: EmailStr,
) -> State:
"""Save user input as a new partner in database."""
delete_partner(partner_id=partner_id)
return {"deleted_partner": {"name": name, "email": email, "partner_id": partner_id}}
@workflow(
"Modify partners",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def task_delete_partners() -> StepList:
"""Delete a Partner."""
return begin >> delete_partner_from_database >> done
"""A modification workflow that edit a partner."""
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State
from orchestrator.workflow import StepList, begin, done, step, workflow
from pydantic import ConfigDict, EmailStr, field_validator
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import Choice
from gso.services.partners import (
PartnerSchema,
edit_partner,
filter_partners_by_email,
filter_partners_by_name,
get_all_partners,
get_partner_by_name,
)
def initial_input_form_generator() -> FormGenerator:
"""Gather input from the user needed for modifying a partner."""
partners = {}
for partner in get_all_partners():
partners[partner["partner_id"]] = partner["name"]
partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type]
class SelectPartnerForm(FormPage):
model_config = ConfigDict(title="Choose a Partner")
partners: partner_choice # type: ignore[valid-type]
initial_user_input = yield SelectPartnerForm
partner = get_partner_by_name(name=initial_user_input.partners.name)
class ModifyPartnerForm(FormPage):
model_config = ConfigDict(title="Modify a Partner")
name: str = partner["name"]
email: EmailStr = partner["email"]
@field_validator("name")
def validate_name(cls, name: str) -> str:
if partner["name"] != name and filter_partners_by_name(name=name, case_sensitive=False):
msg = "Partner with this name already exists."
raise ValueError(msg)
return name
@field_validator("email")
def validate_email(cls, email: str) -> EmailStr:
if partner["email"] != email and filter_partners_by_email(email=email, case_sensitive=False):
msg = "Partner with this email already exists."
raise ValueError(msg)
return email
user_input = yield ModifyPartnerForm
return user_input.model_dump() | {"partner_id": partner["partner_id"]}
@step("Save partner information to database")
def save_partner_to_database(
partner_id: UUIDstr,
name: str,
email: EmailStr,
) -> State:
"""Save user input as a new partner in database."""
partner = edit_partner(
partner_data=PartnerSchema(
partner_id=partner_id,
name=name,
email=email,
)
)
return {"modified_partner": partner}
@workflow(
"Modify partners",
initial_input_form=initial_input_form_generator,
target=Target.SYSTEM,
)
def task_modify_partners() -> StepList:
"""Modify a new Partner."""
return begin >> save_partner_to_database >> done
...@@ -31,10 +31,8 @@ from sqlalchemy.engine import make_url ...@@ -31,10 +31,8 @@ from sqlalchemy.engine import make_url
from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.orm import scoped_session, sessionmaker
from starlette.testclient import TestClient from starlette.testclient import TestClient
from gso.db.models import PartnerType
from gso.main import init_gso_app from gso.main import init_gso_app
from gso.schema.partner import PartnerCreate from gso.services.partners import PartnerSchema, create_partner
from gso.services.partners import create_partner
from gso.utils.helpers import LAGMember from gso.utils.helpers import LAGMember
from test.fixtures import ( # noqa: F401 from test.fixtures import ( # noqa: F401
iptrunk_side_subscription_factory, iptrunk_side_subscription_factory,
...@@ -267,8 +265,24 @@ def test_client(fastapi_app): ...@@ -267,8 +265,24 @@ def test_client(fastapi_app):
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def geant_partner(): def partner_factory():
return create_partner(PartnerCreate(name="GEANT-TEST", partner_type=PartnerType.GEANT, email="goat-test@geant.org")) def _create_partner(
name: str,
email: str,
) -> dict:
return create_partner(
PartnerSchema(
name=name,
email=email,
)
)
return _create_partner
@pytest.fixture(scope="session")
def geant_partner(partner_factory):
return partner_factory(name="GEANT-TEST", email="goat-test@geant.org")
@pytest.fixture() @pytest.fixture()
......
import pytest
from pydantic_forms.exceptions import FormValidationError
from gso.services.partners import get_partner_by_name
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_create_partner_with_minimal_input():
result, _, _ = run_workflow(
"task_create_partners",
[
{
"name": "GEANT-TEST-CREATION",
"email": "goat-test-creation@geant.org",
}
],
)
assert_complete(result)
state = extract_state(result)
partner = get_partner_by_name(state["name"])
assert partner["name"] == "GEANT-TEST-CREATION"
assert partner["email"] == "goat-test-creation@geant.org"
@pytest.mark.workflow()
def test_create_partner_with_full_input():
result, _, _ = run_workflow(
"task_create_partners",
[
{
"name": "GEANT-TEST-CREATION",
"email": "goat-test-creation@geant.org",
}
],
)
assert_complete(result)
state = extract_state(result)
partner = get_partner_by_name(state["name"])
assert partner["name"] == "GEANT-TEST-CREATION"
assert partner["email"] == "goat-test-creation@geant.org"
@pytest.mark.workflow()
def test_create_partner_with_invalid_input_fails():
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_create_partners",
[
{
"name": "Kenneth Boyle",
"email": "invalid_email",
}
],
)
errors = error.value.errors
email_error = errors[0]
assert email_error["loc"] == ("email",)
assert "valid email address" in email_error["msg"]
def test_create_partner_with_duplicate_name_or_email_fails(partner_factory):
partner_factory(
name="new_name",
email="myemail@gmail.com",
)
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_create_partners",
[
{
"name": "NEW_name",
"email": "myemail@gmail.com",
}
],
)
assert error.value.errors[0]["msg"] == "Partner with this name already exists."
assert error.value.errors[1]["msg"] == "Partner with this email already exists."
import pytest
from gso.services.partners import filter_partners_by_name
from test.workflows import assert_complete, run_workflow
@pytest.mark.workflow()
def test_delete_partner_success(partner_factory):
partner = partner_factory(
name="new_name",
email="myemail@gmail.com",
)
assert filter_partners_by_name(name="new_name", case_sensitive=False)
result, _, _ = run_workflow(
"task_delete_partners",
[
{"partners": partner["partner_id"]},
],
)
assert_complete(result)
assert filter_partners_by_name(name="new_name", case_sensitive=False) is None
import pytest
from pydantic_forms.exceptions import FormValidationError
from gso.services.partners import get_partner_by_id
from test.workflows import assert_complete, run_workflow
@pytest.mark.workflow()
def test_modify_partner_success(partner_factory):
partner = partner_factory(
name="new_name",
email="myemail@gmail.com",
)
result, _, _ = run_workflow(
"task_modify_partners",
[
{"partners": partner["partner_id"]},
{
"name": "GEANT-TEST-CREATION",
"email": "goat-test-creation@geant.org",
},
],
)
assert_complete(result)
partner_db = get_partner_by_id(partner_id=partner["partner_id"])
assert partner_db.name == "GEANT-TEST-CREATION"
assert partner_db.email == "goat-test-creation@geant.org"
@pytest.mark.workflow()
def test_modify_partner_with_same_date_success(partner_factory):
partner = partner_factory(
name="new_name",
email="myemail@gmail.com",
)
result, _, _ = run_workflow(
"task_modify_partners",
[
{"partners": partner["partner_id"]},
{
"name": "new_name",
"email": "myemail@gmail.com",
},
],
)
assert_complete(result)
partner_db = get_partner_by_id(partner_id=partner["partner_id"])
assert partner_db.name == "new_name"
assert partner_db.email == "myemail@gmail.com"
@pytest.mark.workflow()
def test_modify_partner_with_duplicate_name_or_email_fails(partner_factory):
partner_factory(
name="new_name",
email="myemail@gmail.com",
)
partner_2 = partner_factory(
name="new_name_2",
email="myemail2@gmail.com",
)
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_modify_partners",
[
{"partners": partner_2["partner_id"]},
{
"name": "new_name",
"email": "myemail@gmail.com",
},
],
)
assert error.value.errors[0]["msg"] == "Partner with this name already exists."
assert error.value.errors[1]["msg"] == "Partner with this email already exists."
@pytest.mark.workflow()
def test_modify_partner_with_invalid_input_fails(partner_factory):
partner = partner_factory(
name="new_name_2",
email="myemail2@gmail.com",
)
with pytest.raises(FormValidationError) as error:
run_workflow(
"task_modify_partners",
[
{"partners": partner["partner_id"]},
{
"name": "Kenneth Boyle",
"email": "invalid_email",
},
],
)
errors = error.value.errors
email_error = errors[0]
assert email_error["loc"] == ("email",)
assert "valid email address" in email_error["msg"]
...@@ -4,7 +4,7 @@ from test.workflows import assert_complete, extract_state, run_workflow ...@@ -4,7 +4,7 @@ from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow() @pytest.mark.workflow()
def test_task_validate_geant_products(responses, faker): def test_task_validate_geant_products(responses):
result, _, _ = run_workflow("task_validate_geant_products", [{}]) result, _, _ = run_workflow("task_validate_geant_products", [{}])
assert_complete(result) assert_complete(result)
state = extract_state(result) state = extract_state(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment