Skip to content
Snippets Groups Projects
Commit fcc72cd1 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

fix ruff errors

parent 1cc03792
No related branches found
No related tags found
No related merge requests found
Pipeline #86639 failed
This commit is part of merge request !188. Comments created here will be created in the context of that merge request.
......@@ -28,7 +28,7 @@ def init_worker_app() -> OrchestratorCore:
def init_cli_app() -> typer.Typer:
"""Initialise :term:`GSO` as a CLI application."""
from gso.cli import imports, netbox
from gso.cli import imports, netbox # noqa: PLC0415
cli_app.add_typer(imports.app, name="import-cli")
cli_app.add_typer(netbox.app, name="netbox-cli")
......
......@@ -7,16 +7,21 @@ from typing import Any
from celery import current_app
from celery.schedules import crontab
from pydantic import BaseModel
def scheduler(
name: str,
minute: str = "*",
hour: str = "*",
day_of_week: str = "*",
day_of_month: str = "*",
month_of_year: str = "*",
) -> Callable[[Callable], Callable]:
class CronScheduleConfig(BaseModel):
"""Configuration for scheduling a task using crontab-like timing parameters."""
name: str
minute: str = "*"
hour: str = "*"
day_of_week: str = "*"
day_of_month: str = "*"
month_of_year: str = "*"
def scheduler(cron_scheduler_config: CronScheduleConfig) -> Callable[[Callable], Callable]:
"""Schedule a Celery task using crontab-like timing.
Examples
......@@ -44,14 +49,14 @@ def scheduler(
task_path = f"{module.__name__}.{task_func.__name__}"
current_app.conf.beat_schedule[task_func.__name__] = {
"name": name,
"name": cron_scheduler_config.name,
"task": task_path,
"schedule": crontab(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week,
minute=cron_scheduler_config.minute,
hour=cron_scheduler_config.hour,
day_of_month=cron_scheduler_config.day_of_month,
month_of_year=cron_scheduler_config.month_of_year,
day_of_week=cron_scheduler_config.day_of_week,
),
}
......
......@@ -2,12 +2,12 @@
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.worker import celery
@celery.task
@scheduler(name="Clean up tasks", hour="*/6")
@scheduler(CronScheduleConfig(name="Clean up tasks", hour="*/6"))
def vacuum_tasks() -> None:
"""Run all cleanup tasks every 6 hours."""
start_process("task_clean_up_tasks")
......@@ -2,13 +2,13 @@
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import count_incomplete_validate_products
from gso.worker import celery
@celery.task
@scheduler(name="Validate Products and inactive subscriptions", minute="30", hour="2")
@scheduler(CronScheduleConfig(name="Validate Products and inactive subscriptions", minute="30", hour="2"))
def validate_products() -> None:
"""Validate all products."""
if count_incomplete_validate_products() > 0:
......
......@@ -5,7 +5,7 @@ from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP
from orchestrator.targets import Target
from gso.schedules.scheduling import scheduler
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import get_insync_subscriptions
from gso.worker import celery
......@@ -13,7 +13,7 @@ logger = structlog.get_logger(__name__)
@celery.task
@scheduler(name="Subscriptions Validator", minute="10", hour="0")
@scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="0"))
def validate_subscriptions() -> None:
"""Validate all subscriptions using their corresponding validation workflow."""
subscriptions = get_insync_subscriptions()
......
......@@ -37,7 +37,7 @@ def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
return connector.Connector(options), oss
def _allocate_network(
def _allocate_network( # noqa: PLR0917
conn: connector.Connector,
dns_view: str,
network_view: str,
......
"""Shared choices for the different models."""
import ipaddress
from typing import Annotated
from typing import Annotated, Any
from pydantic import Field, PlainSerializer
from pydantic_forms.types import strEnum
from typing_extensions import Doc
def convert_to_str(value: Any) -> str:
"""Convert the value to a string."""
return str(value)
class Vendor(strEnum):
"""Enumerator for the different product vendors that are supported."""
......@@ -29,11 +34,11 @@ PortNumber = Annotated[
IPv4AddressType = Annotated[
ipaddress.IPv4Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always")
ipaddress.IPv4Address, PlainSerializer(convert_to_str, return_type=str, when_used="always")
]
IPv6AddressType = Annotated[
ipaddress.IPv6Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always")
ipaddress.IPv6Address, PlainSerializer(convert_to_str, return_type=str, when_used="always")
]
......
......@@ -9,7 +9,7 @@ from gso.settings import load_oss_params
class OrchestratorCelery(Celery):
"""A :term:`GSO` instance that functions as a Celery worker."""
def on_init(self) -> None:
def on_init(self) -> None: # noqa: PLR6301
"""Initialise a new Celery worker."""
init_worker_app()
......
......@@ -113,3 +113,6 @@ filterwarnings = [
"ignore",
"default:::gso",
]
[tool.ruff.lint.per-file-ignores]
"test/*" = ["PLR0917", "S101", "D104", "D105", "D103", "D100", "ARG001", "D102", "PLR2004", "D101", "D106", "D107", "PLR0914", "PLC0415", "PLC2701"]
"gso/workflows/*" = ["PLR0917", "PLR0914"]
\ No newline at end of file
"""Setup script for the GÉANT Service Orchestrator."""
from setuptools import find_packages, setup
setup(
......
......@@ -268,7 +268,7 @@ async def test_oidc_user_call_no_token(oidc_user, mock_request):
mock_post.return_value = MagicMock(status_code=200, json=lambda: {"active": False})
mock_get.return_value = MagicMock(status_code=200, json=dict)
result = await oidc_user.__call__(mock_request)
result = await oidc_user.__call__(mock_request) # noqa: PLC2801
assert result is None
......@@ -281,7 +281,7 @@ async def test_oidc_user_call_token_from_request(oidc_user, mock_request, mock_a
oidc_user.introspect_token = AsyncMock(return_value={"active": True})
oidc_user.userinfo = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))
result = await oidc_user.__call__(mock_request)
result = await oidc_user.__call__(mock_request) # noqa: PLC2801
assert isinstance(result, OIDCUserModel)
assert result["sub"] == "123"
......
......@@ -57,7 +57,7 @@ def pytest_collection_modifyitems(config, items):
class UseJuniperSide(strEnum):
"""Define on tests on which side to use Juniper router"""
"""Define on tests on which side to use Juniper router."""
NONE = "none"
SIDE_A = "side_a"
......
......@@ -3,7 +3,7 @@ from unittest.mock import MagicMock, patch
import pytest
from orchestrator.targets import Target
from gso.schedules.scheduling import scheduler
from gso.schedules.scheduling import CronScheduleConfig, scheduler
@pytest.fixture(scope="module")
......@@ -42,12 +42,14 @@ def test_scheduler_updates_beat_schedule(mock_celery):
mock_celery.conf.beat_schedule = {}
@scheduler(
name="A cool task",
minute="0",
hour="0",
day_of_week="*",
day_of_month="*",
month_of_year="*",
CronScheduleConfig(
name="A cool task",
minute="0",
hour="0",
day_of_week="*",
day_of_month="*",
month_of_year="*",
)
)
def mock_task():
return "task result"
......@@ -64,12 +66,14 @@ def test_scheduled_task_still_works():
"""Ensure that the scheduler decorator does not change the behavior of the function it decorates."""
@scheduler(
name="A cool task",
minute="0",
hour="0",
day_of_week="*",
day_of_month="*",
month_of_year="*",
CronScheduleConfig(
name="A cool task",
minute="0",
hour="0",
day_of_week="*",
day_of_month="*",
month_of_year="*",
)
)
def mock_task():
return "task result"
......
......@@ -23,7 +23,7 @@ def mock_netbox_client():
@pytest.fixture()
def generate_tt_numbers(faker, request):
"""Generator for valid and invalid tt numbers."""
"""Get a Generator for valid and invalid tt numbers."""
valid_count = request.param.get("valid", 0)
invalid_count = request.param.get("invalid", 0)
......@@ -78,7 +78,7 @@ def test_nokia_router_with_interfaces_returns_choice(mock_router, mock_netbox_cl
@pytest.mark.parametrize("generate_tt_numbers", [{"valid": 5, "invalid": 3}], indirect=True)
def test_tt_number(generate_tt_numbers):
"""Test different TT numbers"""
"""Test different TT numbers."""
for tt_number, is_valid in generate_tt_numbers:
if is_valid:
assert validate_tt_number(tt_number) == tt_number
......
......@@ -157,7 +157,8 @@ class WorkflowInstanceForTests(LazyWorkflowInstance):
This can be as simple as merely importing a workflow function. However, if it concerns a workflow generating
function, that function will be called with or without arguments as specified.
Returns:
Returns
-------
A workflow function.
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment