Skip to content
Snippets Groups Projects

Improve logging in the subscription validation task

Merged Karel van Klink requested to merge feature/rework-task-scheduler into develop
3 files
+ 73
9
Compare changes
  • Side-by-side
  • Inline
Files
3
+ 46
0
 
"""CLI for interacting with the task scheduler. Only supports running a single task."""
 
 
# <!-- vale off -->
 
# Copyright 2019-2020 SURF.
 
# Copyright 2025 GÉANT Vereniging.
 
# Licensed under the Apache License, Version 2.0 (the "License");
 
# you may not use this file except in compliance with the License.
 
# You may obtain a copy of the License at
 
#
 
# http://www.apache.org/licenses/LICENSE-2.0
 
#
 
# Unless required by applicable law or agreed to in writing, software
 
# distributed under the License is distributed on an "AS IS" BASIS,
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
# See the License for the specific language governing permissions and
 
# limitations under the License.
 
# <!-- vale on -->
 
 
import logging
 
from collections.abc import Callable
 
 
import typer
 
 
from gso.schedules.clean_old_tasks import clean_old_tasks
 
from gso.schedules.send_email_notifications import send_email_notifications
 
from gso.schedules.validate_products import validate_products
 
from gso.schedules.validate_subscriptions import validate_subscriptions
 
 
logger = logging.getLogger(__name__)
 
 
app: typer.Typer = typer.Typer()
 
 
ALL_SCHEDULES: list[Callable] = [
 
clean_old_tasks,
 
send_email_notifications,
 
validate_products,
 
validate_subscriptions,
 
]
 
 
 
@app.command()
 
def run_task(task_name: str) -> None:
 
"""Force the execution of a task by name."""
 
for s in ALL_SCHEDULES:
 
if task_name == s.__name__:
 
s()
Loading