From 1a277beb99d44dce49c87f37eb8783a7496952f2 Mon Sep 17 00:00:00 2001 From: Karel van Klink <karel.vanklink@geant.org> Date: Fri, 9 May 2025 15:35:14 +0200 Subject: [PATCH] Add CLI for running scheduled tasks --- gso/__init__.py | 3 ++- gso/cli/schedule.py | 47 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 gso/cli/schedule.py diff --git a/gso/__init__.py b/gso/__init__.py index 8ed08f8e1..99f1d3860 100644 --- a/gso/__init__.py +++ b/gso/__init__.py @@ -65,10 +65,11 @@ def init_gso_app() -> OrchestratorCore: def init_cli_app() -> typer.Typer: """Initialise GSO as a CLI application.""" - from gso.cli import imports, netbox # noqa: PLC0415 + from gso.cli import imports, netbox, schedule # noqa: PLC0415 cli_app.add_typer(imports.app, name="import-cli") cli_app.add_typer(netbox.app, name="netbox-cli") + cli_app.add_typer(schedule.app, name="schedule-cli") return cli_app() diff --git a/gso/cli/schedule.py b/gso/cli/schedule.py new file mode 100644 index 000000000..3446977d8 --- /dev/null +++ b/gso/cli/schedule.py @@ -0,0 +1,47 @@ +"""CLI for interacting with the task scheduler. Only supports running a single task.""" + +# <!-- vale off --> +# Copyright 2019-2020 SURF. +# Copyright 2025 GÉANT Vereniging. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# <!-- vale on --> + + +import logging +from collections.abc import Callable + +import typer + +from gso.schedules.clean_old_tasks import clean_old_tasks +from gso.schedules.send_email_notifications import send_email_notifications +from gso.schedules.validate_products import validate_products +from gso.schedules.validate_subscriptions import validate_subscriptions + +logger = logging.getLogger(__name__) + +app: typer.Typer = typer.Typer() + +ALL_SCHEDULES: list[Callable] = [ + clean_old_tasks, + send_email_notifications, + validate_products, + validate_subscriptions, +] + + +@app.command() +def run_task(task_name: str) -> None: + """Force the execution of a task by name.""" + for s in ALL_SCHEDULES: + if task_name == s.__name__: + s() -- GitLab