diff --git a/gso/__init__.py b/gso/__init__.py
index 8ed08f8e1716e9f6e04d6cd244253b43f01875d8..99f1d386081a807a5c7b34527ba890cafe7aab1e 100644
--- a/gso/__init__.py
+++ b/gso/__init__.py
@@ -65,10 +65,11 @@ def init_gso_app() -> OrchestratorCore:
 
 def init_cli_app() -> typer.Typer:
     """Initialise GSO as a CLI application."""
-    from gso.cli import imports, netbox  # noqa: PLC0415
+    from gso.cli import imports, netbox, schedule  # noqa: PLC0415
 
     cli_app.add_typer(imports.app, name="import-cli")
     cli_app.add_typer(netbox.app, name="netbox-cli")
+    cli_app.add_typer(schedule.app, name="schedule-cli")
     return cli_app()
 
 
diff --git a/gso/cli/schedule.py b/gso/cli/schedule.py
new file mode 100644
index 0000000000000000000000000000000000000000..3446977d8a33d6601fd5e037e7723825be6f2cb0
--- /dev/null
+++ b/gso/cli/schedule.py
@@ -0,0 +1,47 @@
+"""CLI for interacting with the task scheduler. Only supports running a single task."""
+
+# <!-- vale off -->
+# Copyright 2019-2020 SURF.
+# Copyright 2025 GÉANT Vereniging.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# <!-- vale on -->
+
+
+import logging
+from collections.abc import Callable
+
+import typer
+
+from gso.schedules.clean_old_tasks import clean_old_tasks
+from gso.schedules.send_email_notifications import send_email_notifications
+from gso.schedules.validate_products import validate_products
+from gso.schedules.validate_subscriptions import validate_subscriptions
+
+logger = logging.getLogger(__name__)
+
+app: typer.Typer = typer.Typer()
+
+ALL_SCHEDULES: list[Callable] = [
+    clean_old_tasks,
+    send_email_notifications,
+    validate_products,
+    validate_subscriptions,
+]
+
+
+@app.command()
+def run_task(task_name: str) -> None:
+    """Force the execution of a task by name."""
+    for s in ALL_SCHEDULES:
+        if task_name == s.__name__:
+            s()