diff --git a/gso/db/models.py b/gso/db/models.py
index 02d8c59ce72e6e1ecb47bab5c513853a9f59551b..c6382b1c81d06f9192ed4f416e186a2d990a5a45 100644
--- a/gso/db/models.py
+++ b/gso/db/models.py
@@ -1,50 +1,25 @@
 """Database model definitions and table mappings for the GSO system."""
 
-import enum
-
 import structlog
 from orchestrator.db import UtcTimestamp
 from orchestrator.db.database import BaseModel
 from sqlalchemy import (
-    Enum,
     String,
     text,
 )
-from sqlalchemy.dialects.postgresql import ARRAY
 from sqlalchemy.orm import mapped_column
 
 logger = structlog.get_logger(__name__)
 
 
-class PartnerType(str, enum.Enum):
-    """Defining different types of partners in the GSO system."""
-
-    NREN = "NREN"
-    RE_PEER = "RE_PEER"
-    PUBLIC_PEER = "PUBLIC_PEER"
-    PRIVATE_PEER = "PRIVATE_PEER"
-    UPSTREAM = "UPSTREAM"
-    GEANT = "GEANT"
-
-
 class PartnerTable(BaseModel):
     """Database table for the partners in the GSO system."""
 
     __tablename__ = "partners"
 
     partner_id = mapped_column(String, server_default=text("uuid_generate_v4"), primary_key=True)
-    name = mapped_column(String, unique=True, nullable=True)
+    name = mapped_column(String, unique=True, nullable=False)
     email = mapped_column(String, unique=True, nullable=False)
-    partner_type = mapped_column(Enum(PartnerType), nullable=False)
-
-    as_number = mapped_column(
-        String, unique=True, nullable=True
-    )  # the as_number and as_set are mutually exclusive. if you give me one I don't need the other
-    as_set = mapped_column(String, nullable=True)
-    route_set = mapped_column(String, nullable=True)
-    black_listed_as_sets = mapped_column(ARRAY(String), nullable=True)
-    additional_routers = mapped_column(ARRAY(String), nullable=True)
-    additional_bgp_speakers = mapped_column(ARRAY(String), nullable=True)
 
     created_at = mapped_column(UtcTimestamp, server_default=text("current_timestamp"), nullable=False)
     updated_at = mapped_column(
diff --git a/gso/migrations/versions/2024-07-29_41fd1ae225aq_create_modify_delete_partner_task.py b/gso/migrations/versions/2024-07-29_41fd1ae225aq_create_modify_delete_partner_task.py
new file mode 100644
index 0000000000000000000000000000000000000000..26cc705fd5b856cd5a204c1d9dda93eb42028537
--- /dev/null
+++ b/gso/migrations/versions/2024-07-29_41fd1ae225aq_create_modify_delete_partner_task.py
@@ -0,0 +1,73 @@
+"""Add task_create_partners, task_modify_partners and task_delete_partners.
+
+Revision ID: 41fd1ae225aq
+Revises: 31fd1ae8d5bb
+Create Date: 2024-07-29 17:11:00.00000
+
+"""
+
+from uuid import uuid4
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "41fd1ae225aq"
+down_revision = "31fd1ae8d5bb"
+branch_labels = None
+depends_on = None
+
+workflows = [
+    {"name": "task_create_partners", "description": "Create partners", "workflow_id": uuid4(), "target": "SYSTEM"},
+    {"name": "task_modify_partners", "description": "Modify partners", "workflow_id": uuid4(), "target": "SYSTEM"},
+    {"name": "task_delete_partners", "description": "Delete partners", "workflow_id": uuid4(), "target": "SYSTEM"},
+]
+
+
+def upgrade() -> None:
+    conn = op.get_bind()
+    for workflow in workflows:
+        conn.execute(
+            sa.text(
+                "INSERT INTO workflows VALUES (:workflow_id, :name, :target, :description, now()) ON CONFLICT DO NOTHING"
+            ),
+            workflow,
+        )
+
+    op.alter_column('partners', 'email',
+                    existing_type=sa.String(),
+                    nullable=False)
+    op.drop_column(
+        'partners',
+        'partner_type'
+    )
+    op.drop_column(
+        'partners',
+        'as_number'
+    )
+    op.drop_column(
+        'partners',
+        'as_set'
+    )
+    op.drop_column(
+        'partners',
+        'route_set'
+    )
+    op.drop_column(
+        'partners',
+        'black_listed_as_sets'
+    )
+    op.drop_column(
+        'partners',
+        'additional_routers'
+    )
+    op.drop_column(
+        'partners',
+        'additional_bgp_speakers'
+    )
+
+
+def downgrade() -> None:
+    conn = op.get_bind()
+    for workflow in workflows:
+        conn.execute(sa.text("DELETE FROM workflows WHERE name = :name"), {"name": workflow["name"]})
diff --git a/gso/schema/__init__.py b/gso/schema/__init__.py
deleted file mode 100644
index 20b21e2c5736e9d2890482561fd61bc06a84e3c9..0000000000000000000000000000000000000000
--- a/gso/schema/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-"""It is used to group the schema files together as a package."""
diff --git a/gso/schema/partner.py b/gso/schema/partner.py
deleted file mode 100644
index b1c58c2cf91bf544501f6b2e316117b8b83a70c9..0000000000000000000000000000000000000000
--- a/gso/schema/partner.py
+++ /dev/null
@@ -1,26 +0,0 @@
-"""Partner schema module."""
-
-from datetime import datetime
-from uuid import uuid4
-
-from pydantic import BaseModel, ConfigDict, EmailStr, Field
-
-from gso.db.models import PartnerType
-
-
-class PartnerCreate(BaseModel):
-    """Partner create schema."""
-
-    partner_id: str = Field(default_factory=lambda: str(uuid4()))
-    name: str
-    email: EmailStr | None = None
-    as_number: str | None = None
-    as_set: str | None = None
-    route_set: str | None = None
-    black_listed_as_sets: list[str] | None = None
-    additional_routers: list[str] | None = None
-    additional_bgp_speakers: list[str] | None = None
-    partner_type: PartnerType
-    created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
-    updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
-    model_config = ConfigDict(from_attributes=True)
diff --git a/gso/services/partners.py b/gso/services/partners.py
index 6c425bdad2432634d27780c85541bd9a02b94b7f..63d55bb3f3d4044f60ce90f4f6cbd839e1efa069 100644
--- a/gso/services/partners.py
+++ b/gso/services/partners.py
@@ -1,12 +1,27 @@
 """A module that returns the partners available in :term:`GSO`."""
 
+from datetime import datetime
 from typing import Any
+from uuid import uuid4
 
 from orchestrator.db import db
-from sqlalchemy.exc import NoResultFound, SQLAlchemyError
+from pydantic import BaseModel, ConfigDict, EmailStr, Field
+from sqlalchemy import func
+from sqlalchemy.exc import NoResultFound
 
 from gso.db.models import PartnerTable
-from gso.schema.partner import PartnerCreate
+
+
+class PartnerSchema(BaseModel):
+    """Partner schema."""
+
+    partner_id: str = Field(default_factory=lambda: str(uuid4()))
+    name: str
+    email: EmailStr
+
+    created_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
+    updated_at: datetime = Field(default_factory=lambda: datetime.now().astimezone())
+    model_config = ConfigDict(from_attributes=True)
 
 
 class PartnerNotFoundError(Exception):
@@ -22,46 +37,84 @@ def get_all_partners() -> list[dict]:
 def get_partner_by_name(name: str) -> dict[str, Any]:
     """Try to get a partner by their name."""
     try:
-        partner = PartnerTable.query.filter(PartnerTable.name == name).one()
+        partner = db.session.query(PartnerTable).filter(PartnerTable.name == name).one()
         return partner.__json__()
     except NoResultFound as e:
         msg = f"partner {name} not found"
         raise PartnerNotFoundError(msg) from e
 
 
+def get_partner_by_id(partner_id: str) -> PartnerTable:
+    """Try to get a partner by their id."""
+    partner = db.session.query(PartnerTable).filter_by(partner_id=partner_id).first()
+    if not partner:
+        raise PartnerNotFoundError
+
+    return partner
+
+
+def filter_partners_by_attribute(
+    attribute: str, value: str, *, case_sensitive: bool = True
+) -> list[dict[str, Any]] | None:
+    """Filter the list of partners by a specified attribute."""
+    if case_sensitive:
+        partners = db.session.query(PartnerTable).filter(getattr(PartnerTable, attribute) == value).all()
+    else:
+        partners = (
+            db.session.query(PartnerTable)
+            .filter(func.lower(getattr(PartnerTable, attribute)) == func.lower(value))
+            .all()
+        )
+
+    return [partner.__json__() for partner in partners] if partners else None
+
+
+def filter_partners_by_name(name: str, *, case_sensitive: bool = True) -> list[dict[str, Any]] | None:
+    """Filter the list of partners by name."""
+    return filter_partners_by_attribute("name", name, case_sensitive=case_sensitive)
+
+
+def filter_partners_by_email(email: str, *, case_sensitive: bool = True) -> list[dict[str, Any]] | None:
+    """Filter the list of partners by email."""
+    return filter_partners_by_attribute("email", email, case_sensitive=case_sensitive)
+
+
 def create_partner(
-    partner_data: PartnerCreate,
+    partner_data: PartnerSchema,
 ) -> dict:
     """Create a new partner and add it to the database using Pydantic schema for validation.
 
     :param partner_data: Partner data validated by Pydantic schema.
     :return: JSON representation of the created partner.
     """
-    try:
-        new_partner = PartnerTable(**partner_data.model_dump())
+    new_partner = PartnerTable(**partner_data.model_dump())
+    db.session.add(new_partner)
+    db.session.commit()
 
-        db.session.add(new_partner)
-        db.session.commit()
+    return new_partner.__json__()
 
-        return new_partner.__json__()
-    except SQLAlchemyError:
-        db.session.rollback()
-        raise
-    finally:
-        db.session.close()
 
+def edit_partner(
+    partner_data: PartnerSchema,
+) -> PartnerTable:
+    """Edit an existing partner and update it in the database."""
+    partner = get_partner_by_id(partner_id=partner_data.partner_id)
 
-def delete_partner_by_name(name: str) -> None:
-    """Delete a partner by their name."""
-    try:
-        partner = PartnerTable.query.filter(PartnerTable.name == name).one()
-        db.session.delete(partner)
-        db.session.commit()
-    except NoResultFound as e:
-        msg = f"partner {name} not found"
-        raise PartnerNotFoundError(msg) from e
-    except SQLAlchemyError:
-        db.session.rollback()
-        raise
-    finally:
-        db.session.close()
+    if partner_data.name:
+        partner.name = partner_data.name
+    if partner_data.email:
+        partner.email = partner_data.email
+
+    partner.updated_at = datetime.now().astimezone()
+
+    db.session.commit()
+
+    return partner
+
+
+def delete_partner(partner_id: str) -> None:
+    """Delete an existing partner from the database."""
+    partner = get_partner_by_id(partner_id=partner_id)
+
+    db.session.delete(partner)
+    db.session.commit()
diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py
index 029a4faa14b65b032db64859dfb739f64343ca37..4283ba65248d4942333d8964fb6b5ca8e7bc5649 100644
--- a/gso/services/subscriptions.py
+++ b/gso/services/subscriptions.py
@@ -15,6 +15,7 @@ from orchestrator.db import (
     SubscriptionInstanceTable,
     SubscriptionInstanceValueTable,
     SubscriptionTable,
+    db,
 )
 from orchestrator.domain import SubscriptionModel
 from orchestrator.services.subscriptions import query_in_use_by_subscriptions
@@ -29,10 +30,11 @@ SubscriptionType = dict[str, Any]
 
 
 def get_subscriptions(
-    product_types: list[ProductType],
+    product_types: list[ProductType] | None = None,
     lifecycles: list[SubscriptionLifecycle] | None = None,
     includes: list[str] | None = None,
     excludes: list[str] | None = None,
+    partner_id: UUIDstr | None = None,
 ) -> list[SubscriptionType]:
     """Retrieve active subscriptions for a specific product type.
 
@@ -40,13 +42,11 @@ def get_subscriptions(
     :param SubscriptionLifecycle lifecycles: The lifecycles that the products must be in.
     :param list[str] includes: List of fields to be included in the returned Subscription objects.
     :param list[str] excludes: List of fields to be excluded from the returned Subscription objects.
+    :param UUIDstr partner_id: The partner_id of subscriptions.
 
     :return: A list of Subscription objects that match the query.
     :rtype: list[Subscription]
     """
-    if not lifecycles:
-        lifecycles = list(SubscriptionLifecycle)
-
     if not includes:
         includes = [col.name for col in SubscriptionTable.__table__.columns]
 
@@ -55,10 +55,16 @@ def get_subscriptions(
 
     dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]
 
-    query = SubscriptionTable.query.join(ProductTable).filter(
-        ProductTable.product_type.in_([str(product_type) for product_type in product_types]),
-        SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]),
-    )
+    query = db.session.query(SubscriptionTable).join(ProductTable)
+
+    if product_types:
+        query = query.filter(ProductTable.product_type.in_([str(product_type) for product_type in product_types]))
+
+    if lifecycles:
+        query = query.filter(SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]))
+
+    if partner_id:
+        query = query.filter(SubscriptionTable.customer_id == partner_id)
 
     results = query.with_entities(*dynamic_fields).all()
 
diff --git a/gso/translations/en-GB.json b/gso/translations/en-GB.json
index 34420d58179aec73645b47c9e1cc0860e0d94b9d..c74df450a1c5b332b7dbc02cb96742dbb92d5901 100644
--- a/gso/translations/en-GB.json
+++ b/gso/translations/en-GB.json
@@ -68,6 +68,9 @@
         "validate_iptrunk": "Validate IP Trunk configuration",
         "validate_router": "Validate router configuration",
         "task_validate_geant_products": "Validation task for GEANT products",
-        "task_send_email_notifications": "Send email notifications for failed tasks"
+        "task_send_email_notifications": "Send email notifications for failed tasks",
+        "task_create_partners": "Create partner task",
+        "task_modify_partners": "Modify partner task",
+        "task_delete_partners": "Delete partner task"
     }
 }
diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py
index 992311700e539531a96a03388ff0469db239f043..56561838557882cac784faed6745c1d80661a0cd 100644
--- a/gso/workflows/__init__.py
+++ b/gso/workflows/__init__.py
@@ -69,3 +69,6 @@ LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear"
 #  Tasks
 LazyWorkflowInstance("gso.workflows.tasks.send_email_notifications", "task_send_email_notifications")
 LazyWorkflowInstance("gso.workflows.tasks.validate_geant_products", "task_validate_geant_products")
+LazyWorkflowInstance("gso.workflows.tasks.create_partners", "task_create_partners")
+LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partners")
+LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners")
diff --git a/gso/workflows/tasks/create_partners.py b/gso/workflows/tasks/create_partners.py
new file mode 100644
index 0000000000000000000000000000000000000000..b04c5c68d4f2ba7ddc350c87d047313af111e23c
--- /dev/null
+++ b/gso/workflows/tasks/create_partners.py
@@ -0,0 +1,66 @@
+"""A creation workflow that create a partner."""
+
+from orchestrator.forms import FormPage
+from orchestrator.targets import Target
+from orchestrator.types import FormGenerator, State
+from orchestrator.workflow import StepList, begin, done, step, workflow
+from pydantic import ConfigDict, EmailStr, field_validator
+
+from gso.services.partners import PartnerSchema, create_partner, filter_partners_by_email, filter_partners_by_name
+
+
+def initial_input_form_generator() -> FormGenerator:
+    """Gather input from the user needed for creating a partner."""
+
+    class CreatePartnerForm(FormPage):
+        model_config = ConfigDict(title="Create a Partner")
+
+        name: str
+        email: EmailStr
+
+        @field_validator("name")
+        def validate_name(cls, name: str) -> str:
+            if filter_partners_by_name(name=name, case_sensitive=False):
+                msg = "Partner with this name already exists."
+                raise ValueError(msg)
+
+            return name
+
+        @field_validator("email")
+        def validate_email(cls, email: str) -> EmailStr:
+            email = email.lower()
+            if filter_partners_by_email(email=email, case_sensitive=False):
+                msg = "Partner with this email already exists."
+                raise ValueError(msg)
+
+            return email
+
+    initial_user_input = yield CreatePartnerForm
+
+    return initial_user_input.model_dump()
+
+
+@step("Save partner information to database")
+def save_partner_to_database(
+    name: str,
+    email: EmailStr,
+) -> State:
+    """Save user input as a new partner in database."""
+    partner = create_partner(
+        partner_data=PartnerSchema(
+            name=name,
+            email=email,
+        )
+    )
+
+    return {"created_partner": partner}
+
+
+@workflow(
+    "Create partners",
+    initial_input_form=initial_input_form_generator,
+    target=Target.SYSTEM,
+)
+def task_create_partners() -> StepList:
+    """Create a new Partner."""
+    return begin >> save_partner_to_database >> done
diff --git a/gso/workflows/tasks/delete_partners.py b/gso/workflows/tasks/delete_partners.py
new file mode 100644
index 0000000000000000000000000000000000000000..a98e051dd7b6cb40f35c5732a33d0f558a74a6bc
--- /dev/null
+++ b/gso/workflows/tasks/delete_partners.py
@@ -0,0 +1,63 @@
+"""A delete workflow that remove a partner."""
+
+from enum import Enum
+
+from orchestrator.forms import FormPage
+from orchestrator.targets import Target
+from orchestrator.types import FormGenerator, State
+from orchestrator.workflow import StepList, begin, done, step, workflow
+from pydantic import ConfigDict, EmailStr, field_validator
+from pydantic_forms.types import UUIDstr
+from pydantic_forms.validators import Choice
+
+from gso.services.partners import delete_partner, get_all_partners, get_partner_by_name
+from gso.services.subscriptions import get_subscriptions
+
+
+def initial_input_form_generator() -> FormGenerator:
+    """Gather input from the user needed for deleting a partner."""
+    partners = {}
+    for partner in get_all_partners():
+        partners[partner["partner_id"]] = partner["name"]
+
+    partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True))  # type: ignore[arg-type]
+
+    class SelectPartnerForm(FormPage):
+        model_config = ConfigDict(title="Delete a Partner")
+        partners: partner_choice  # type: ignore[valid-type]
+
+        @field_validator("partners")
+        def validate_partners(cls, value: Enum) -> Enum:
+            if get_subscriptions(partner_id=str(value)):
+                msg = "This partner has associated data and cannot be removed."
+                raise ValueError(msg)
+
+            return value
+
+    initial_user_input = yield SelectPartnerForm
+
+    partner = get_partner_by_name(name=initial_user_input.partners.name)
+
+    return {"email": partner["email"], "name": partner["name"], "partner_id": partner["partner_id"]}
+
+
+@step("Delete partner information to database")
+def delete_partner_from_database(
+    partner_id: UUIDstr,
+    name: str,
+    email: EmailStr,
+) -> State:
+    """Delete a partner from database."""
+    delete_partner(partner_id=partner_id)
+
+    return {"deleted_partner": {"name": name, "email": email, "partner_id": partner_id}}
+
+
+@workflow(
+    "Delete partners",
+    initial_input_form=initial_input_form_generator,
+    target=Target.SYSTEM,
+)
+def task_delete_partners() -> StepList:
+    """Delete a Partner."""
+    return begin >> delete_partner_from_database >> done
diff --git a/gso/workflows/tasks/modify_partners.py b/gso/workflows/tasks/modify_partners.py
new file mode 100644
index 0000000000000000000000000000000000000000..0e82521c3ee72cbc8912b5cdde136387c1948cea
--- /dev/null
+++ b/gso/workflows/tasks/modify_partners.py
@@ -0,0 +1,90 @@
+"""A modification workflow that modifies a partner."""
+
+from orchestrator.forms import FormPage
+from orchestrator.targets import Target
+from orchestrator.types import FormGenerator, State
+from orchestrator.workflow import StepList, begin, done, step, workflow
+from pydantic import ConfigDict, EmailStr, field_validator
+from pydantic_forms.types import UUIDstr
+from pydantic_forms.validators import Choice
+
+from gso.services.partners import (
+    PartnerSchema,
+    edit_partner,
+    filter_partners_by_email,
+    filter_partners_by_name,
+    get_all_partners,
+    get_partner_by_name,
+)
+
+
+def initial_input_form_generator() -> FormGenerator:
+    """Gather input from the user needed for modifying a partner."""
+    partners = {}
+    for partner in get_all_partners():
+        partners[partner["partner_id"]] = partner["name"]
+
+    partner_choice = Choice("Select a partner", zip(partners.values(), partners.items(), strict=True))  # type: ignore[arg-type]
+
+    class SelectPartnerForm(FormPage):
+        model_config = ConfigDict(title="Choose a Partner")
+
+        partners: partner_choice  # type: ignore[valid-type]
+
+    initial_user_input = yield SelectPartnerForm
+
+    partner = get_partner_by_name(name=initial_user_input.partners.name)
+
+    class ModifyPartnerForm(FormPage):
+        model_config = ConfigDict(title="Modify a Partner")
+
+        name: str = partner["name"]
+        email: EmailStr = partner["email"]
+
+        @field_validator("name")
+        def validate_name(cls, name: str) -> str:
+            if partner["name"] != name and filter_partners_by_name(name=name, case_sensitive=False):
+                msg = "Partner with this name already exists."
+                raise ValueError(msg)
+
+            return name
+
+        @field_validator("email")
+        def validate_email(cls, email: str) -> EmailStr:
+            if partner["email"] != email and filter_partners_by_email(email=email, case_sensitive=False):
+                msg = "Partner with this email already exists."
+                raise ValueError(msg)
+
+            return email
+
+    user_input = yield ModifyPartnerForm
+
+    return user_input.model_dump() | {"partner_id": partner["partner_id"]}
+
+
+@step("Save partner information to database")
+def save_partner_to_database(
+    partner_id: UUIDstr,
+    name: str,
+    email: EmailStr,
+) -> State:
+    """Save modified partner in database."""
+    partner = edit_partner(
+        partner_data=PartnerSchema(
+            partner_id=partner_id,
+            name=name,
+            email=email,
+        )
+    )
+
+    return {"modified_partner": partner}
+
+
+@workflow(
+    "Modify partners",
+    initial_input_form=initial_input_form_generator,
+    target=Target.SYSTEM,
+)
+def task_modify_partners() -> StepList:
+    """Modify a Partner."""
+    return begin >> save_partner_to_database >> done
diff --git a/test/conftest.py b/test/conftest.py
index e700a239c4d7e8fbdd35945c67db05da312874a4..0c4e36e4f52d60d8270493b9dd234ffb080ba507 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -32,10 +32,8 @@ from sqlalchemy.orm import scoped_session, sessionmaker
 from starlette.testclient import TestClient
 from urllib3_mock import Responses
 
-from gso.db.models import PartnerType
 from gso.main import init_gso_app
-from gso.schema.partner import PartnerCreate
-from gso.services.partners import create_partner
+from gso.services.partners import PartnerSchema, create_partner
 from gso.utils.helpers import LAGMember
 from test.fixtures import (  # noqa: F401
     iptrunk_side_subscription_factory,
@@ -268,8 +266,24 @@ def test_client(fastapi_app):
 
 
 @pytest.fixture(scope="session")
-def geant_partner():
-    return create_partner(PartnerCreate(name="GEANT-TEST", partner_type=PartnerType.GEANT, email="goat-test@geant.org"))
+def partner_factory():
+    def _create_partner(
+        name: str,
+        email: str,
+    ) -> dict:
+        return create_partner(
+            PartnerSchema(
+                name=name,
+                email=email,
+            )
+        )
+
+    return _create_partner
+
+
+@pytest.fixture(scope="session")
+def geant_partner(partner_factory):
+    return partner_factory(name="GEANT-TEST", email="goat-test@geant.org")
 
 
 @pytest.fixture()
diff --git a/test/fixtures.py b/test/fixtures.py
index 14ce92aec90a4a5371b8b929f0c21ccb25b42e33..84481eda9c9d816e438414636cd335ecb2ef1b63 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -6,12 +6,19 @@ from uuid import uuid4
 import pytest
 from orchestrator import step, workflow
 from orchestrator.config.assignee import Assignee
-from orchestrator.db import db
+from orchestrator.db import (
+    ProductTable,
+    SubscriptionInstanceTable,
+    SubscriptionInstanceValueTable,
+    SubscriptionTable,
+    db,
+)
 from orchestrator.domain import SubscriptionModel
 from orchestrator.types import SubscriptionLifecycle, UUIDstr
+from orchestrator.utils.datetime import nowtz
 from orchestrator.workflow import done, init, inputstep
 from pydantic_forms.core import FormPage
-from pydantic_forms.types import FormGenerator
+from pydantic_forms.types import FormGenerator, SubscriptionMapping
 from pydantic_forms.validators import Choice
 
 from gso.products import ProductName
@@ -559,3 +566,73 @@ def test_workflow(generic_subscription_1: UUIDstr, generic_product_type_1) -> Ge
 
     with WorkflowInstanceForTests(workflow_for_testing_processes_py, "workflow_for_testing_processes_py") as wf:
         yield wf
+
+
+def create_subscription_for_mapping(
+    product: ProductTable, mapping: SubscriptionMapping, values: dict[str, Any], **kwargs: Any
+) -> SubscriptionTable:
+    """Create a subscription in the test coredb for the given subscription_mapping and values.
+
+    This function handles optional resource types starting with a ? in the mapping not supplied in the values array.
+
+    Args:
+        product: the ProductTable to create a sub for
+        mapping: the subscription_mapping belonging to that product
+        values: a dictionary of keys from the sub_map and their corresponding test values
+        kwargs: The rest of the arguments
+
+    Returns: The conforming subscription.
+    """
+
+    def build_instance(name, value_mapping):
+        block = product.find_block_by_name(name)
+
+        def build_value(rt, value):
+            resource_type = block.find_resource_type_by_name(rt)
+            return SubscriptionInstanceValueTable(resource_type_id=resource_type.resource_type_id, value=value)
+
+        return SubscriptionInstanceTable(
+            product_block_id=block.product_block_id,
+            values=[
+                build_value(resource_type, values[value_key]) for (resource_type, value_key) in value_mapping.items()
+            ],
+        )
+
+    # recreate the mapping: leave out the ?keys if no value supplied for them
+    mapping = {
+        name: [
+            {
+                **{k: value_map[k] for k in value_map if not value_map[k].startswith("?")},
+                **{
+                    k: value_map[k][1:]
+                    for k in value_map
+                    if value_map[k].startswith("?") and value_map[k][1:] in values
+                },
+            }
+            for value_map in mapping[name]
+        ]
+        for name in mapping
+    }
+
+    instances = [
+        build_instance(name, value_mapping)
+        for (name, value_mappings) in mapping.items()
+        for value_mapping in value_mappings
+    ]
+
+    return create_subscription(instances=instances, product=product, **kwargs)
+
+
+def create_subscription(**kwargs):
+    attrs = {
+        "description": "A subscription.",
+        "customer_id": kwargs.get("customer_id", "85938c4c-0a11-e511-80d0-005056956c1a"),
+        "start_date": nowtz(),
+        "status": "active",
+        "insync": True,
+        **kwargs,
+    }
+    o = SubscriptionTable(**attrs)
+    db.session.add(o)
+    db.session.commit()
+    return o
diff --git a/test/workflows/tasks/test_create_partners.py b/test/workflows/tasks/test_create_partners.py
new file mode 100644
index 0000000000000000000000000000000000000000..4a902aa42771f74ee6cebb4b8ba78c92a79b2253
--- /dev/null
+++ b/test/workflows/tasks/test_create_partners.py
@@ -0,0 +1,65 @@
+import pytest
+from pydantic_forms.exceptions import FormValidationError
+
+from gso.services.partners import get_partner_by_name
+from test.workflows import assert_complete, extract_state, run_workflow
+
+
+@pytest.mark.workflow()
+def test_create_partner_success():
+    result, _, _ = run_workflow(
+        "task_create_partners",
+        [
+            {
+                "name": "GEANT-TEST-CREATION",
+                "email": "goat-test-creation@geant.org",
+            }
+        ],
+    )
+    assert_complete(result)
+    state = extract_state(result)
+
+    partner = get_partner_by_name(state["name"])
+    assert partner["name"] == "GEANT-TEST-CREATION"
+    assert partner["email"] == "goat-test-creation@geant.org"
+
+
+@pytest.mark.workflow()
+def test_create_partner_with_invalid_input_fails():
+    with pytest.raises(FormValidationError) as error:
+        run_workflow(
+            "task_create_partners",
+            [
+                {
+                    "name": "Kenneth Boyle",
+                    "email": "invalid_email",
+                }
+            ],
+        )
+
+    errors = error.value.errors
+
+    email_error = errors[0]
+    assert email_error["loc"] == ("email",)
+    assert "valid email address" in email_error["msg"]
+
+
+def test_create_partner_with_duplicate_name_or_email_fails(partner_factory):
+    partner_factory(
+        name="new_name",
+        email="myemail@gmail.com",
+    )
+
+    with pytest.raises(FormValidationError) as error:
+        run_workflow(
+            "task_create_partners",
+            [
+                {
+                    "name": "NEW_name",
+                    "email": "myemail@gmail.com",
+                }
+            ],
+        )
+
+    assert error.value.errors[0]["msg"] == "Partner with this name already exists."
+    assert error.value.errors[1]["msg"] == "Partner with this email already exists."
diff --git a/test/workflows/tasks/test_delete_partners.py b/test/workflows/tasks/test_delete_partners.py
new file mode 100644
index 0000000000000000000000000000000000000000..b0964bdfc93be71624a8f947876cae956a4810b2
--- /dev/null
+++ b/test/workflows/tasks/test_delete_partners.py
@@ -0,0 +1,60 @@
+from uuid import uuid4
+
+import pytest
+from orchestrator.db import ProductTable, db
+from pydantic_forms.exceptions import FormValidationError
+from sqlalchemy import select
+
+from gso.services.partners import filter_partners_by_name
+from test.fixtures import create_subscription_for_mapping
+from test.workflows import assert_complete, run_workflow
+
+CORRECT_SUBSCRIPTION = str(uuid4())
+
+
+def get_one_product(product_name):
+    return db.session.scalars(select(ProductTable).where(ProductTable.name == product_name)).one()
+
+
+@pytest.mark.workflow()
+def test_delete_partner_success(partner_factory):
+    partner = partner_factory(
+        name="new_name",
+        email="myemail@gmail.com",
+    )
+
+    assert filter_partners_by_name(name="new_name", case_sensitive=False)
+
+    result, _, _ = run_workflow(
+        "task_delete_partners",
+        [
+            {"partners": partner["partner_id"]},
+        ],
+    )
+    assert_complete(result)
+
+    assert filter_partners_by_name(name="new_name", case_sensitive=False) is None
+
+
+def test_delete_partner_with_associated_data_fails(generic_product_3, partner_factory):
+    partner = partner_factory(
+        name="new_name",
+        email="myemail@gmail.com",
+    )
+
+    subscription_mapping = {"PB_2": [{"rt_3": "info.id", "rt_2": "info2.id"}]}
+    values = {"info.id": "0", "info2.id": "X"}
+    product = get_one_product("Product 3")
+    create_subscription_for_mapping(
+        product, subscription_mapping, values, subscription_id=CORRECT_SUBSCRIPTION, customer_id=partner["partner_id"]
+    )
+
+    with pytest.raises(FormValidationError) as error:
+        run_workflow(
+            "task_delete_partners",
+            [
+                {"partners": partner["partner_id"]},
+            ],
+        )
+
+    assert error.value.errors[0]["msg"] == "This partner has associated data and cannot be removed."
diff --git a/test/workflows/tasks/test_modify_partners.py b/test/workflows/tasks/test_modify_partners.py
new file mode 100644
index 0000000000000000000000000000000000000000..23559734df77b9a558b8eacbaf0677a0da172a0b
--- /dev/null
+++ b/test/workflows/tasks/test_modify_partners.py
@@ -0,0 +1,103 @@
+import pytest
+from pydantic_forms.exceptions import FormValidationError
+
+from gso.services.partners import get_partner_by_id
+from test.workflows import assert_complete, run_workflow
+
+
+@pytest.mark.workflow()
+def test_modify_partner_success(partner_factory):
+    partner = partner_factory(
+        name="new_name",
+        email="myemail@gmail.com",
+    )
+    result, _, _ = run_workflow(
+        "task_modify_partners",
+        [
+            {"partners": partner["partner_id"]},
+            {
+                "name": "GEANT-TEST-CREATION",
+                "email": "goat-test-creation@geant.org",
+            },
+        ],
+    )
+    assert_complete(result)
+
+    partner_db = get_partner_by_id(partner_id=partner["partner_id"])
+    assert partner_db.name == "GEANT-TEST-CREATION"
+    assert partner_db.email == "goat-test-creation@geant.org"
+
+
+@pytest.mark.workflow()
+def test_modify_partner_with_same_date_success(partner_factory):
+    partner = partner_factory(
+        name="new_name",
+        email="myemail@gmail.com",
+    )
+    result, _, _ = run_workflow(
+        "task_modify_partners",
+        [
+            {"partners": partner["partner_id"]},
+            {
+                "name": "new_name",
+                "email": "myemail@gmail.com",
+            },
+        ],
+    )
+    assert_complete(result)
+
+    partner_db = get_partner_by_id(partner_id=partner["partner_id"])
+    assert partner_db.name == "new_name"
+    assert partner_db.email == "myemail@gmail.com"
+
+
+@pytest.mark.workflow()
+def test_modify_partner_with_duplicate_name_or_email_fails(partner_factory):
+    partner_factory(
+        name="new_name",
+        email="myemail@gmail.com",
+    )
+    partner_2 = partner_factory(
+        name="new_name_2",
+        email="myemail2@gmail.com",
+    )
+
+    with pytest.raises(FormValidationError) as error:
+        run_workflow(
+            "task_modify_partners",
+            [
+                {"partners": partner_2["partner_id"]},
+                {
+                    "name": "new_name",
+                    "email": "myemail@gmail.com",
+                },
+            ],
+        )
+
+    assert error.value.errors[0]["msg"] == "Partner with this name already exists."
+    assert error.value.errors[1]["msg"] == "Partner with this email already exists."
+
+
+@pytest.mark.workflow()
+def test_modify_partner_with_invalid_input_fails(partner_factory):
+    partner = partner_factory(
+        name="new_name_2",
+        email="myemail2@gmail.com",
+    )
+    with pytest.raises(FormValidationError) as error:
+        run_workflow(
+            "task_modify_partners",
+            [
+                {"partners": partner["partner_id"]},
+                {
+                    "name": "Kenneth Boyle",
+                    "email": "invalid_email",
+                },
+            ],
+        )
+
+    errors = error.value.errors
+
+    email_error = errors[0]
+    assert email_error["loc"] == ("email",)
+    assert "valid email address" in email_error["msg"]
diff --git a/test/workflows/tasks/test_task_validate_products.py b/test/workflows/tasks/test_task_validate_products.py
index 0dba0817344e426db93a3603815bc669ff7fc510..b12b3a4a6b0190f7d150faf54d9c61d466f4f8de 100644
--- a/test/workflows/tasks/test_task_validate_products.py
+++ b/test/workflows/tasks/test_task_validate_products.py
@@ -4,7 +4,7 @@ from test.workflows import assert_complete, extract_state, run_workflow
 
 
 @pytest.mark.workflow()
-def test_task_validate_geant_products(responses, faker):
+def test_task_validate_geant_products(responses):
     result, _, _ = run_workflow("task_validate_geant_products", [{}])
     assert_complete(result)
     state = extract_state(result)