Skip to content
Snippets Groups Projects
Commit a996af8f authored by Mohammad Torkashvand's avatar Mohammad Torkashvand :wolf:
Browse files

fix ruff errors

parent f38cecd1
No related branches found
No related tags found
1 merge request!188upgrade to orchestrato-core v2
......@@ -28,7 +28,7 @@ def init_worker_app() -> OrchestratorCore:
def init_cli_app() -> typer.Typer:
"""Initialise :term:`GSO` as a CLI application."""
from gso.cli import imports, netbox
from gso.cli import imports, netbox # noqa: PLC0415
cli_app.add_typer(imports.app, name="import-cli")
cli_app.add_typer(netbox.app, name="netbox-cli")
......
......
......@@ -7,16 +7,21 @@ from typing import Any
from celery import current_app
from celery.schedules import crontab
from pydantic import BaseModel
def scheduler(
name: str,
minute: str = "*",
hour: str = "*",
day_of_week: str = "*",
day_of_month: str = "*",
month_of_year: str = "*",
) -> Callable[[Callable], Callable]:
class CronScheduleConfig(BaseModel):
"""Configuration for scheduling a task using crontab-like timing parameters."""
name: str
minute: str = "*"
hour: str = "*"
day_of_week: str = "*"
day_of_month: str = "*"
month_of_year: str = "*"
def scheduler(cron_scheduler_config: CronScheduleConfig) -> Callable[[Callable], Callable]:
"""Schedule a Celery task using crontab-like timing.
Examples
......@@ -44,14 +49,14 @@ def scheduler(
task_path = f"{module.__name__}.{task_func.__name__}"
current_app.conf.beat_schedule[task_func.__name__] = {
"name": name,
"name": cron_scheduler_config.name,
"task": task_path,
"schedule": crontab(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week,
minute=cron_scheduler_config.minute,
hour=cron_scheduler_config.hour,
day_of_month=cron_scheduler_config.day_of_month,
month_of_year=cron_scheduler_config.month_of_year,
day_of_week=cron_scheduler_config.day_of_week,
),
}
......
......
......@@ -2,12 +2,12 @@
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.worker import celery
@celery.task
@scheduler(name="Clean up tasks", hour="*/6")
@scheduler(CronScheduleConfig(name="Clean up tasks", hour="*/6"))
def vacuum_tasks() -> None:
"""Run all cleanup tasks every 6 hours."""
start_process("task_clean_up_tasks")
......@@ -2,13 +2,13 @@
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import scheduler
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import count_incomplete_validate_products
from gso.worker import celery
@celery.task
@scheduler(name="Validate Products and inactive subscriptions", minute="30", hour="2")
@scheduler(CronScheduleConfig(name="Validate Products and inactive subscriptions", minute="30", hour="2"))
def validate_products() -> None:
"""Validate all products."""
if count_incomplete_validate_products() > 0:
......
......
......@@ -5,7 +5,7 @@ from orchestrator.services.processes import get_execution_context
from orchestrator.services.subscriptions import TARGET_DEFAULT_USABLE_MAP, WF_USABLE_MAP
from orchestrator.targets import Target
from gso.schedules.scheduling import scheduler
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import get_insync_subscriptions
from gso.worker import celery
......@@ -13,7 +13,7 @@ logger = structlog.get_logger(__name__)
@celery.task
@scheduler(name="Subscriptions Validator", minute="10", hour="0")
@scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="0"))
def validate_subscriptions() -> None:
"""Validate all subscriptions using their corresponding validation workflow."""
subscriptions = get_insync_subscriptions()
......
......
......@@ -37,7 +37,7 @@ def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
return connector.Connector(options), oss
def _allocate_network(
def _allocate_network( # noqa: PLR0917
conn: connector.Connector,
dns_view: str,
network_view: str,
......
......
"""Shared choices for the different models."""
import ipaddress
from typing import Annotated
from typing import Annotated, Any
from pydantic import Field, PlainSerializer
from pydantic_forms.types import strEnum
from typing_extensions import Doc
def convert_to_str(value: Any) -> str:
"""Convert the value to a string."""
return str(value)
class Vendor(strEnum):
"""Enumerator for the different product vendors that are supported."""
......@@ -29,11 +34,11 @@ PortNumber = Annotated[
IPv4AddressType = Annotated[
ipaddress.IPv4Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always")
ipaddress.IPv4Address, PlainSerializer(convert_to_str, return_type=str, when_used="always")
]
IPv6AddressType = Annotated[
ipaddress.IPv6Address, PlainSerializer(lambda ip: str(ip), return_type=str, when_used="always")
ipaddress.IPv6Address, PlainSerializer(convert_to_str, return_type=str, when_used="always")
]
......
......
......@@ -9,7 +9,7 @@ from gso.settings import load_oss_params
class OrchestratorCelery(Celery):
"""A :term:`GSO` instance that functions as a Celery worker."""
def on_init(self) -> None:
def on_init(self) -> None: # noqa: PLR6301
"""Initialise a new Celery worker."""
init_worker_app()
......
......
......@@ -113,3 +113,6 @@ filterwarnings = [
"ignore",
"default:::gso",
]
[tool.ruff.lint.per-file-ignores]
"test/*" = ["PLR0917", "S101", "D104", "D105", "D103", "D100", "ARG001", "D102", "PLR2004", "D101", "D106", "D107", "PLR0914", "PLC0415", "PLC2701"]
"gso/workflows/*" = ["PLR0917", "PLR0914"]
\ No newline at end of file
"""Setup script for the GÉANT Service Orchestrator."""
from setuptools import find_packages, setup
setup(
......
......
......@@ -268,7 +268,7 @@ async def test_oidc_user_call_no_token(oidc_user, mock_request):
mock_post.return_value = MagicMock(status_code=200, json=lambda: {"active": False})
mock_get.return_value = MagicMock(status_code=200, json=dict)
result = await oidc_user.__call__(mock_request)
result = await oidc_user.__call__(mock_request) # noqa: PLC2801
assert result is None
......@@ -281,7 +281,7 @@ async def test_oidc_user_call_token_from_request(oidc_user, mock_request, mock_a
oidc_user.introspect_token = AsyncMock(return_value={"active": True})
oidc_user.userinfo = AsyncMock(return_value=OIDCUserModel({"sub": "123", "name": "John Doe"}))
result = await oidc_user.__call__(mock_request)
result = await oidc_user.__call__(mock_request) # noqa: PLC2801
assert isinstance(result, OIDCUserModel)
assert result["sub"] == "123"
......
......
......@@ -57,7 +57,7 @@ def pytest_collection_modifyitems(config, items):
class UseJuniperSide(strEnum):
"""Define on tests on which side to use Juniper router"""
"""Define on tests on which side to use Juniper router."""
NONE = "none"
SIDE_A = "side_a"
......
......
......@@ -3,7 +3,7 @@ from unittest.mock import MagicMock, patch
import pytest
from orchestrator.targets import Target
from gso.schedules.scheduling import scheduler
from gso.schedules.scheduling import CronScheduleConfig, scheduler
@pytest.fixture(scope="module")
......@@ -42,6 +42,7 @@ def test_scheduler_updates_beat_schedule(mock_celery):
mock_celery.conf.beat_schedule = {}
@scheduler(
CronScheduleConfig(
name="A cool task",
minute="0",
hour="0",
......@@ -49,6 +50,7 @@ def test_scheduler_updates_beat_schedule(mock_celery):
day_of_month="*",
month_of_year="*",
)
)
def mock_task():
return "task result"
......@@ -64,6 +66,7 @@ def test_scheduled_task_still_works():
"""Ensure that the scheduler decorator does not change the behavior of the function it decorates."""
@scheduler(
CronScheduleConfig(
name="A cool task",
minute="0",
hour="0",
......@@ -71,6 +74,7 @@ def test_scheduled_task_still_works():
day_of_month="*",
month_of_year="*",
)
)
def mock_task():
return "task result"
......
......
......@@ -23,7 +23,7 @@ def mock_netbox_client():
@pytest.fixture()
def generate_tt_numbers(faker, request):
"""Generator for valid and invalid tt numbers."""
"""Get a Generator for valid and invalid tt numbers."""
valid_count = request.param.get("valid", 0)
invalid_count = request.param.get("invalid", 0)
......@@ -78,7 +78,7 @@ def test_nokia_router_with_interfaces_returns_choice(mock_router, mock_netbox_cl
@pytest.mark.parametrize("generate_tt_numbers", [{"valid": 5, "invalid": 3}], indirect=True)
def test_tt_number(generate_tt_numbers):
"""Test different TT numbers"""
"""Test different TT numbers."""
for tt_number, is_valid in generate_tt_numbers:
if is_valid:
assert validate_tt_number(tt_number) == tt_number
......
......
......@@ -157,7 +157,8 @@ class WorkflowInstanceForTests(LazyWorkflowInstance):
This can be as simple as merely importing a workflow function. However, if it concerns a workflow generating
function, that function will be called with or without arguments as specified.
Returns:
Returns
-------
A workflow function.
"""
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment