Skip to content
Snippets Groups Projects
Commit 4fe7813c authored by geant-release-service's avatar geant-release-service
Browse files

Finished release 1.5.

parents 0e15f35e e66bf5e9
Branches
Tags 1.5
No related merge requests found
Showing
with 14 additions and 730 deletions
StylesPath = styles
MinAlertLevel = suggestion
Vocab = geant-jargon, Sphinx
Packages = proselint, Microsoft
[*]
BasedOnStyles = Vale, proselint, Microsoft
; Found to be too intrusive
Microsoft.Passive = NO
; We are not a general audience
Microsoft.GeneralURL = NO
; It's okay to leave TODOs in the code, that's what they're for
proselint.Annotations = NO
; Replacing a ... with … shouldn't be holding back the entire CI pipeline
proselint.Typography = warning
; Same applies for not using contractions
Microsoft.Contractions = warning
Microsoft.Headings = NO
TokenIgnores = (:class:`\S+`)
[formats]
py = rst
param
[LSO|lso]
[Ss]ubpackages
Vereniging
against
"""Automatically invoked app factory."""
import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from lso import config, environment
from lso.routes.default import router as default_router
from lso.routes.playbook import router as playbook_router
def create_app() -> FastAPI:
"""Override default settings with those found in the file read from environment variable `SETTINGS_FILENAME`.
:return: a new flask app instance
"""
app = FastAPI()
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]
)
app.include_router(default_router, prefix="/api")
app.include_router(playbook_router, prefix="/api/playbook")
# test that config params are loaded and available
config.load()
environment.setup_logging()
logging.info("FastAPI app initialized")
return app
"""Default app creation."""
import lso
app = lso.create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run("lso.app:app", host="0.0.0.0", port=44444, log_level="debug")
"""A module for loading configuration data, including a config schema that data is validated against.
Data is loaded from a file, the location of which may be specified when using :func:`load_from_file`.
Config file location can also be loaded from environment variable ``$SETTINGS_FILENAME``, which is default behaviour in
:func:`load`.
"""
import json
import os
from pathlib import Path
import jsonschema
from pydantic import BaseModel
CONFIG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {"ansible_playbooks_root_dir": {"type": "string"}},
"required": ["ansible_playbooks_root_dir"],
"additionalProperties": False,
}
DEFAULT_REQUEST_TIMEOUT = 10
class Config(BaseModel):
"""Simple Config class.
Contains the root directory at which Ansible playbooks are present.
"""
ansible_playbooks_root_dir: str
def load_from_file(file: Path) -> Config:
"""Load, validate and return configuration parameters.
Input is validated against this jsonschema:
.. asjson:: lso.config.CONFIG_SCHEMA
:param file: :class:`Path` object that produces the config file.
:return: a dict containing the parsed configuration parameters.
"""
config = json.loads(file.read_text())
jsonschema.validate(config, CONFIG_SCHEMA)
return Config(**config)
def load() -> Config:
"""Load a config file, located at the path specified in the environment variable ``$SETTINGS_FILENAME``.
Loading and validating the file is performed by :func:`load_from_file`.
:return: a dict containing the parsed configuration parameters
"""
assert "SETTINGS_FILENAME" in os.environ, "Environment variable SETTINGS_FILENAME not set" # noqa: S101
return load_from_file(Path(os.environ["SETTINGS_FILENAME"]))
"""Environment module for setting up logging."""
import json
import logging.config
import os
from pathlib import Path
LOGGING_DEFAULT_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {"simple": {"format": "%(asctime)s - %(name)s (%(lineno)d) - %(levelname)s - %(message)s"}},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "simple",
"stream": "ext://sys.stdout",
}
},
"loggers": {"resource_management": {"level": "DEBUG", "handlers": ["console"], "propagate": False}},
"root": {"level": "INFO", "handlers": ["console"]},
}
def setup_logging() -> None:
"""Set up logging using the configured filename.
If LOGGING_CONFIG is defined in the environment, use this for the filename, otherwise use LOGGING_DEFAULT_CONFIG.
"""
logging_config = LOGGING_DEFAULT_CONFIG
if "LOGGING_CONFIG" in os.environ:
filename = os.environ["LOGGING_CONFIG"]
config_file = Path(filename).read_text()
logging_config = json.loads(config_file)
logging.config.dictConfig(logging_config)
"""Module that gathers common API responses and data models."""
import logging
import threading
import uuid
from pathlib import Path
from typing import Any
import ansible_runner
import requests
from fastapi import status
from fastapi.responses import JSONResponse
from pydantic import HttpUrl
from lso import config
from lso.config import DEFAULT_REQUEST_TIMEOUT
logger = logging.getLogger(__name__)
def get_playbook_path(playbook_name: str) -> Path:
"""Get the path of a playbook on the local filesystem."""
config_params = config.load()
return Path(config_params.ansible_playbooks_root_dir) / playbook_name
def playbook_launch_success(job_id: str) -> JSONResponse:
"""Return a :class:`PlaybookLaunchResponse` for the successful start of a playbook execution.
:return JSONResponse: A playbook launch response that's successful.
"""
return JSONResponse(content={"job_id": job_id}, status_code=status.HTTP_201_CREATED)
def playbook_launch_error(reason: str, status_code: int = status.HTTP_400_BAD_REQUEST) -> JSONResponse:
"""Return a :class:`PlaybookLaunchResponse` for the erroneous start of a playbook execution.
:param str reason: The reason why a request has failed.
:param status status_code: The HTTP status code that should be associated with this request. Defaults to HTTP 400:
Bad request.
:return JSONResponse: A playbook launch response that's unsuccessful.
"""
return JSONResponse(content={"error": reason}, status_code=status_code)
def _run_playbook_proc(
job_id: str, playbook_path: str, extra_vars: dict, inventory: dict[str, Any] | str, callback: str
) -> None:
"""Run a playbook, internal function.
:param str job_id: Identifier of the job that's executed.
:param str playbook_path: Ansible playbook to be executed.
:param dict extra_vars: Extra variables passed to the Ansible playbook.
:param str callback: Callback URL to return output to when execution is completed.
:param dict[str, Any] | str inventory: Ansible inventory to run the playbook against.
"""
ansible_playbook_run = ansible_runner.run(playbook=playbook_path, inventory=inventory, extravars=extra_vars)
payload = {
"status": ansible_playbook_run.status,
"job_id": job_id,
"output": ansible_playbook_run.stdout.readlines(),
"return_code": int(ansible_playbook_run.rc),
}
request_result = requests.post(callback, json=payload, timeout=DEFAULT_REQUEST_TIMEOUT)
if not status.HTTP_200_OK <= request_result.status_code < status.HTTP_300_MULTIPLE_CHOICES:
msg = f"Callback failed: {request_result.text}"
logger.error(msg)
def run_playbook(
playbook_path: Path,
extra_vars: dict[str, Any],
inventory: dict[str, Any] | str,
callback: HttpUrl,
) -> JSONResponse:
"""Run an Ansible playbook against a specified inventory.
:param Path playbook_path: playbook to be executed.
:param dict[str, Any] extra_vars: Any extra vars needed for the playbook to run.
:param dict[str, Any] | str inventory: The inventory that the playbook is executed against.
:param HttpUrl callback: Callback URL where the playbook should send a status update when execution is completed.
This is used for workflow-orchestrator to continue with the next step in a workflow.
:return: Result of playbook launch, this could either be successful or unsuccessful.
:rtype: :class:`fastapi.responses.JSONResponse`
"""
if not Path.exists(playbook_path):
msg = f"Filename '{playbook_path}' does not exist."
return playbook_launch_error(reason=msg, status_code=status.HTTP_404_NOT_FOUND)
if not ansible_runner.utils.isinventory(inventory):
msg = "Invalid inventory provided. Should be a string, or JSON object."
return playbook_launch_error(reason=msg, status_code=status.HTTP_400_BAD_REQUEST)
job_id = str(uuid.uuid4())
thread = threading.Thread(
target=_run_playbook_proc,
kwargs={
"job_id": job_id,
"playbook_path": str(playbook_path),
"inventory": inventory,
"extra_vars": extra_vars,
"callback": callback,
},
)
thread.start()
return playbook_launch_success(job_id=job_id)
"""Module of all routes that are available in LSO."""
"""Default route located at the root URL /.
For now only includes a single endpoint that responds with the current version of the API and LSO.
"""
from importlib import metadata
from fastapi import APIRouter
from pydantic import BaseModel, constr
API_VERSION = "1.0"
VersionString = constr(pattern=r"\d+\.\d+")
router = APIRouter()
class Version(BaseModel):
"""Simple model for returning a version number of both the API and the `goat-lso` module."""
api: VersionString # type: ignore[valid-type]
module: VersionString # type: ignore[valid-type]
@router.get("/version")
def version() -> Version:
"""Return the version numbers of the API version, and the module version.
:return: Version object with both API and `goat-lso` versions numbers.
"""
return Version(api=API_VERSION, module=metadata.version("goat-lso"))
"""The API endpoint from which Ansible playbooks can be executed."""
import json
import tempfile
from contextlib import redirect_stderr
from io import StringIO
from typing import Annotated, Any
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from fastapi import APIRouter, HTTPException, status
from fastapi.responses import JSONResponse
from pydantic import AfterValidator, BaseModel, HttpUrl
from lso.playbook import get_playbook_path, run_playbook
router = APIRouter()
def _inventory_validator(inventory: dict[str, Any] | str) -> dict[str, Any] | str:
"""Validate the format of the provided inventory by trying to parse it.
If an inventory cannot be parsed without warnings or errors, these are returned to the user by means of an HTTP
status 422 for 'unprocessable entity'.
"""
loader = DataLoader()
output = StringIO()
with tempfile.NamedTemporaryFile(mode="w+") as temp_inv, redirect_stderr(output):
json.dump(inventory, temp_inv, ensure_ascii=False)
temp_inv.flush()
inventory_manager = InventoryManager(loader=loader, sources=[temp_inv.name], parse=True)
inventory_manager.parse_source(temp_inv.name)
output.seek(0)
error_messages = output.readlines()
if error_messages:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_messages)
return inventory
PlaybookInventory = Annotated[dict[str, Any] | str, AfterValidator(_inventory_validator)]
class PlaybookRunParams(BaseModel):
"""Parameters for executing an Ansible playbook."""
#: The filename of a playbook that is executed. It should be present inside the directory defined in the
#: configuration option ``ansible_playbooks_root_dir``.
playbook_name: str
#: The address where LSO should call back to upon completion.
callback: HttpUrl
#: The inventory to run the playbook against. This inventory can also include any host vars, if needed. When
#: including host vars, it should be a dictionary. Can be a simple string containing hostnames when no host vars are
#: needed. In the latter case, multiple hosts should be separated with a ``\n`` newline character only.
inventory: PlaybookInventory
#: Extra variables that should get passed to the playbook. This includes any required configuration objects
#: from the workflow orchestrator, commit comments, whether this execution should be a dry run, a trouble ticket
#: number, etc. Which extra vars are required solely depends on what inputs the playbook requires.
extra_vars: dict[str, Any] = {}
@router.post("/")
def run_playbook_endpoint(params: PlaybookRunParams) -> JSONResponse:
"""Launch an Ansible playbook to modify or deploy a subscription instance.
The response will contain either a job ID, or error information.
:param PlaybookRunParams params: Parameters for executing a playbook.
:return JSONResponse: Response from the Ansible runner, including a run ID.
"""
return run_playbook(
playbook_path=get_playbook_path(params.playbook_name),
extra_vars=params.extra_vars,
inventory=params.inventory,
callback=params.callback,
)
[tool.mypy]
exclude = [
"venv",
"test/*",
"docs"
]
ignore_missing_imports = true
disallow_untyped_calls = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
strict_optional = true
namespace_packages = true
warn_unused_ignores = true
warn_redundant_casts = true
warn_no_return = true
warn_unreachable = true
implicit_reexport = false
strict_equality = true
show_error_codes = true
show_column_numbers = true
# Suppress "note: By default the bodies of untyped functions are not checked"
disable_error_code = "annotation-unchecked"
# Forbid the use of a generic "type: ignore" without specifying the exact error that is ignored
enable_error_code = "ignore-without-code"
[tool.ruff]
extend-exclude = [
"htmlcov",
"docs",
]
ignore = [
"COM812",
"D203",
"D213",
"N805",
"PLR0913",
"PLR0904",
"PLW1514",
"S104"
]
line-length = 120
select = [
"A",
"ARG",
"B",
"BLE",
"C",
"COM",
"C4",
"C90",
"D",
"DTZ",
"E",
"EM",
"ERA",
"F",
"FA",
"FBT",
"FLY",
"FURB",
"G",
"I",
"ICN",
"INP",
"ISC",
"LOG",
"N",
"PERF",
"PGH",
"PIE",
"PL",
"PT",
"PTH",
"PYI",
"Q",
"RET",
"R",
"RET",
"RSE",
"RUF",
"S",
"SIM",
"SLF",
"T",
"T20",
"TID",
"TRY",
"UP",
"W",
"YTT"
]
target-version = "py311"
[tool.ruff.flake8-tidy-imports]
ban-relative-imports = "all"
[tool.ruff.per-file-ignores]
"test/*" = ["D", "S101"]
"setup.py" = ["D100"]
[tool.ruff.isort]
known-third-party = ["pydantic", "migrations"]
known-first-party = ["test", "docs"]
pytest~=7.4.3 ansible_merge_vars~=5.0.0
Faker~=20.0.3 jinja2==3.1.2
responses~=0.24.1 jmespath~=1.0.1
sphinx~=7.2.6 junos-eznc~=2.6.8
sphinx-rtd-theme~=1.3.0 jxmlease~=1.0.3
docutils~=0.18.1 ncclient~=0.6.13
mypy~=1.7.0 netaddr~=0.8.0
ruff~=0.1.6 requests~=2.31.0
types-setuptools~=68.2.0.1 ruamel.yaml~=0.18.5
types-requests~=2.31.0.10
from setuptools import find_packages, setup from setuptools import setup
setup( setup(
name="goat-lso", name="goat-lso",
version="1.4", version="1.5",
author="GÉANT Orchestration & Automation Team", author="GÉANT Orchestration & Automation Team",
author_email="goat@geant.org", author_email="goat@geant.org",
description="Lightweight Service Orchestrator", description="Lightweight Service Orchestrator",
url="https://gitlab.software.geant.org/goat/gap/lso", url="https://gitlab.software.geant.org/goat/gap/lso",
packages=find_packages(), packages=[],
install_requires=[ install_requires=[],
"ansible_merge_vars~=5.0.0",
"ansible-runner~=2.3.4",
"ansible~=8.6.1",
"dictdiffer~=0.9.0",
"fastapi~=0.104.1",
"GitPython~=3.1.40",
"httpx~=0.25.1",
"jinja2==3.1.2",
"jmespath~=1.0.1",
"jsonschema~=4.20.0",
"junos-eznc~=2.6.8",
"jxmlease~=1.0.3",
"ncclient~=0.6.13",
"netaddr~=0.8.0",
"pydantic~=2.0.3",
"requests~=2.31.0",
"ruamel.yaml~=0.18.5",
"uvicorn[standard]~=0.22.0",
"xmltodict~=0.13.0",
],
license="MIT", license="MIT",
license_files=("LICENSE.txt",), license_files=("LICENSE.txt",),
classifiers=[ classifiers=[],
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
"Framework :: FastAPI",
"Intended Audience :: System Administrators",
"Intended Audience :: Telecommunications Industry",
],
) )
sonar.projectKey=lso
sonar.projectName='Lightweight Service Orchestrator'
sonar.projectVersion=1.0
sonar.sources=lso
sonar.python.coverage.reportPaths=coverage.xml
sonar.host.url=https://sonarqube.software.geant.org/
import json
import os
import tempfile
from collections.abc import Callable, Generator
from io import StringIO
from pathlib import Path
from typing import Any
import pytest
from faker import Faker
from fastapi.testclient import TestClient
import lso
@pytest.fixture()
def mocked_ansible_runner_run() -> Callable:
class Runner:
def __init__(self) -> None:
self.status = "success"
self.rc = 0
self.stdout = StringIO("[{'step one': 'results'}, {'step two': 2}]")
def run(*args: Any, **kwargs: Any) -> Runner: # noqa: ARG001
return Runner()
return run
@pytest.fixture(scope="session")
def configuration_data() -> dict[str, str]:
"""Start the server with valid configuration data."""
with tempfile.TemporaryDirectory() as tempdir:
# Create required YAML files for the unit tests
(Path(tempdir) / "placeholder.yaml").touch()
yield {"ansible_playbooks_root_dir": tempdir}
@pytest.fixture(scope="session")
def data_config_filename(configuration_data: dict[str, str]) -> Generator[str, Any, None]:
"""Fixture that will yield a filename that contains a valid configuration.
:return: Path to valid configuration file
"""
with tempfile.NamedTemporaryFile(mode="w") as file:
file.write(json.dumps(configuration_data))
file.flush()
yield file.name
@pytest.fixture(scope="session")
def client(data_config_filename: str) -> TestClient:
"""Return a client that can be used to test the server."""
os.environ["SETTINGS_FILENAME"] = data_config_filename
app = lso.create_app()
return TestClient(app) # wait here until calling context ends
@pytest.fixture(scope="session")
def faker() -> Faker:
return Faker(locale="en_GB")
from importlib import metadata
import jsonschema
import responses
from fastapi import status
from starlette.testclient import TestClient
from lso.routes.default import API_VERSION, Version
@responses.activate
def test_ip_trunk_modification(client: TestClient) -> None:
rv = client.get("/api/version/")
assert rv.status_code == status.HTTP_200_OK, rv.text
response = rv.json()
jsonschema.validate(response, Version.model_json_schema())
assert response["api"] == API_VERSION, response["api"]
assert response["module"] == metadata.version("goat-lso"), response["module"]
import re
import time
from collections.abc import Callable
from unittest.mock import patch
import responses
from fastapi import status
from fastapi.testclient import TestClient
TEST_CALLBACK_URL = "https://fqdn.abc.xyz/api/resume"
@responses.activate
def test_playbook_endpoint_dict_inventory_success(client: TestClient, mocked_ansible_runner_run: Callable) -> None:
responses.post(url=TEST_CALLBACK_URL, status=status.HTTP_200_OK)
params = {
"playbook_name": "placeholder.yaml",
"callback": TEST_CALLBACK_URL,
"inventory": {
"_meta": {"vars": {"host1.local": {"foo": "bar"}, "host2.local": {"hello": "world"}}},
"all": {"hosts": {"host1.local": None, "host2.local": None}},
},
"extra_vars": {"dry_run": True, "commit_comment": "I am a robot!"},
}
with patch("lso.playbook.ansible_runner.run", new=mocked_ansible_runner_run) as _:
rv = client.post("/api/playbook/", json=params)
assert rv.status_code == status.HTTP_201_CREATED
response = rv.json()
# wait one second for the run thread to finish
time.sleep(1)
assert isinstance(response, dict)
assert isinstance(response["job_id"], str)
responses.assert_call_count(TEST_CALLBACK_URL, 1)
@responses.activate
def test_playbook_endpoint_str_inventory_success(client: TestClient, mocked_ansible_runner_run: Callable) -> None:
responses.post(url=TEST_CALLBACK_URL, status=status.HTTP_200_OK)
params = {
"playbook_name": "placeholder.yaml",
"callback": TEST_CALLBACK_URL,
"inventory": {"all": {"hosts": "host1.local\nhost2.local\nhost3.local"}},
}
with patch("lso.playbook.ansible_runner.run", new=mocked_ansible_runner_run) as _:
rv = client.post("/api/playbook/", json=params)
assert rv.status_code == status.HTTP_201_CREATED
response = rv.json()
# wait one second for the run thread to finish
time.sleep(1)
assert isinstance(response, dict)
assert isinstance(response["job_id"], str)
responses.assert_call_count(TEST_CALLBACK_URL, 1)
@responses.activate
def test_playbook_endpoint_invalid_host_vars(client: TestClient, mocked_ansible_runner_run: Callable) -> None:
params = {
"playbook_name": "placeholder.yaml",
"callback": TEST_CALLBACK_URL,
"inventory": {
"_meta": {"host_vars": {"host1.local": {"foo": "bar"}, "host2.local": {"hello": "world"}}},
"all": {"hosts": "host1.local\nhost2.local\nhost3.local"},
},
}
with patch("lso.playbook.ansible_runner.run", new=mocked_ansible_runner_run) as _:
rv = client.post("/api/playbook/", json=params)
assert rv.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = rv.json()
# wait one second for the run thread to finish
time.sleep(1)
assert isinstance(response, dict)
assert response["detail"] == [
'[WARNING]: Skipping unexpected key (host_vars) in group (_meta), only "vars",\n',
'"children" and "hosts" are valid\n',
]
responses.assert_call_count(TEST_CALLBACK_URL, 0)
@responses.activate
def test_playbook_endpoint_invalid_hosts(client: TestClient, mocked_ansible_runner_run: Callable) -> None:
params = {
"playbook_name": "placeholder.yaml",
"callback": TEST_CALLBACK_URL,
"inventory": {
"_meta": {"vars": {"host1.local": {"foo": "bar"}}},
"all": {"hosts": ["host1.local", "host2.local", "host3.local"]},
},
}
with patch("lso.playbook.ansible_runner.run", new=mocked_ansible_runner_run) as _:
rv = client.post("/api/playbook/", json=params)
assert rv.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
response = rv.json()
# wait one second for the run thread to finish
time.sleep(1)
assert isinstance(response, dict)
assert 'Invalid "hosts" entry for "all" group' in re.sub("\n", " ", "".join(response["detail"]))
responses.assert_call_count(TEST_CALLBACK_URL, 0)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment