Skip to content
Snippets Groups Projects
Commit ccfcef7e authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

add pre-check command to get bgp status from LSO

parent 0285448b
No related branches found
No related tags found
1 merge request!440add pre-check command to get bgp status from LSO
...@@ -65,11 +65,12 @@ def init_gso_app() -> OrchestratorCore: ...@@ -65,11 +65,12 @@ def init_gso_app() -> OrchestratorCore:
def init_cli_app() -> typer.Typer: def init_cli_app() -> typer.Typer:
"""Initialise GSO as a CLI application.""" """Initialise GSO as a CLI application."""
from gso.cli import imports, netbox, schedule # noqa: PLC0415 from gso.cli import imports, netbox, prechecks, schedule # noqa: PLC0415
cli_app.add_typer(imports.app, name="import-cli") cli_app.add_typer(imports.app, name="import-cli")
cli_app.add_typer(netbox.app, name="netbox-cli") cli_app.add_typer(netbox.app, name="netbox-cli")
cli_app.add_typer(schedule.app, name="schedule-cli") cli_app.add_typer(schedule.app, name="schedule-cli")
cli_app.add_typer(prechecks.app, name="precheck-cli")
return cli_app() return cli_app()
......
#!/usr/bin/env python3
"""CLI for GSO pre-check using LSO remote exec endpoint."""
import json
import logging
import click
import httpx
import structlog
import typer
from orchestrator.db import db
from orchestrator.db.database import transactional
from pydantic import ValidationError
from gso import settings
from gso.db.models import BgpStatusPreCheckTable
from gso.utils.types.precheck import ExecutableRunResponse
logger = structlog.get_logger(__name__)
app = typer.Typer()
MAX_OUTPUT_LINES = 50 # Max lines to display before paging
@app.command()
def bgp_status(
host: str = typer.Argument(..., help="FQDN of the router to pre-check"),
nren: str = typer.Argument(..., help="NREN name for import file path"),
) -> None:
"""Trigger the bgp_status_pre-check script on LSO, wait for it to finish.
pretty-print the JSON result inline,
parse the `output` field as JSON-string and page it if large,
and optionally save to the database.
"""
oss = settings.load_oss_params()
p = oss.PROVISIONING_PROXY
payload = {
"executable_name": "bgp_status_pre_check.py",
"args": [host, nren],
"is_async": False,
}
url = f"{p.scheme}://{p.api_base}/api/execute/"
# 1) Call LSO
try:
resp = httpx.post(url, json=payload, timeout=30)
resp.raise_for_status()
except Exception as e:
logger.exception("LSO call failed: %s")
typer.echo(f"Error: failed to call LSO: {e}", err=True)
raise typer.Exit(1) from e
# 2) Validate response
try:
runner = ExecutableRunResponse(**resp.json())
except ValidationError as e:
logger.exception("Invalid response from LSO")
typer.echo("Error: invalid JSON returned by LSO:", err=True)
typer.echo(str(e), err=True)
raise typer.Exit(1) from e
# 3) Print full response inline
full = runner.model_dump(mode="json")
typer.echo(typer.style("\nFull LSO response:", fg=typer.colors.GREEN))
typer.echo(json.dumps(full, indent=2))
# 4) Parse and pretty-print the `output` field, with pagination if large
output_str = runner.result.output if runner.result else ""
typer.echo(typer.style("\nParsed `result.output` as JSON:", fg=typer.colors.CYAN))
try:
parsed = json.loads(output_str)
parsed_text = json.dumps(parsed, indent=2)
if parsed_text.count("\n") > MAX_OUTPUT_LINES:
click.echo_via_pager(parsed_text)
else:
typer.echo(parsed_text)
except json.JSONDecodeError:
typer.echo("(not valid JSON, raw string below)")
typer.echo(output_str)
# 5) Save?
confirm_msg = (
f"\nIf you are happy with the above output for router '{host}' (NREN: {nren}), "
"shall we save it to the database?"
)
if typer.confirm(confirm_msg, default=False):
try:
with db.database_scope(), transactional(db, logger):
record = BgpStatusPreCheckTable(
router_fqdn=host,
nren=nren,
result=runner.result.model_dump(mode="json") if runner.result else {},
)
db.session.add(record)
except Exception as err:
logger.exception("Failed to save pre-check record")
typer.echo("Error: could not save pre-check to database.", err=True)
raise typer.Exit(2) from err
typer.echo("Pre-check result saved.")
else:
typer.echo("Alright, not saving. You can re-run when ready.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
app()
...@@ -4,6 +4,7 @@ import structlog ...@@ -4,6 +4,7 @@ import structlog
from orchestrator.db import UtcTimestamp from orchestrator.db import UtcTimestamp
from orchestrator.db.database import BaseModel from orchestrator.db.database import BaseModel
from sqlalchemy import ( from sqlalchemy import (
JSON,
String, String,
text, text,
) )
...@@ -25,3 +26,43 @@ class PartnerTable(BaseModel): ...@@ -25,3 +26,43 @@ class PartnerTable(BaseModel):
updated_at = mapped_column( updated_at = mapped_column(
UtcTimestamp, server_default=text("current_timestamp"), nullable=False, onupdate=text("current_timestamp") UtcTimestamp, server_default=text("current_timestamp"), nullable=False, onupdate=text("current_timestamp")
) )
class BgpStatusPreCheckTable(BaseModel):
"""Database table for storing per router BGP satus precheck results."""
__tablename__ = "bgp_status_pre_checks"
pre_check_id = mapped_column(
String,
server_default=text("uuid_generate_v4"),
primary_key=True,
)
router_fqdn = mapped_column(
String,
nullable=False,
index=True,
comment="The FQDN of the router under check",
)
nren = mapped_column(
String,
nullable=False,
comment="Name of the NREN (used in import file path)",
)
result = mapped_column(
JSON,
nullable=False,
comment="Raw JSON blob returned by LSO bgp_status_pre_check script",
)
created_at = mapped_column(
UtcTimestamp,
server_default=text("current_timestamp"),
nullable=False,
)
updated_at = mapped_column(
UtcTimestamp,
server_default=text("current_timestamp"),
nullable=False,
onupdate=text("current_timestamp"),
)
...@@ -5,7 +5,7 @@ from orchestrator.db.database import BaseModel ...@@ -5,7 +5,7 @@ from orchestrator.db.database import BaseModel
from orchestrator.settings import app_settings from orchestrator.settings import app_settings
from sqlalchemy import engine_from_config, pool, text from sqlalchemy import engine_from_config, pool, text
from gso.db.models import PartnerTable # noqa: F401 from gso.db.models import PartnerTable, BgpStatusPreCheckTable # noqa: F401
# this is the Alembic Config object, which provides # this is the Alembic Config object, which provides
# access to the values within the .ini file in use. # access to the values within the .ini file in use.
......
"""Add bgp_status_pre_checks table.
Revision ID: 27308f1dd850
Revises: 24858fd1d805
Create Date: 2025-06-27 10:00:00.000000
"""
import sqlalchemy as sa
from alembic import op
from orchestrator.db import UtcTimestamp
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = '27308f1dd850'
down_revision = '24858fd1d805'
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
'bgp_status_pre_checks',
sa.Column('pre_check_id', sa.String(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
sa.Column('router_fqdn', sa.String(), nullable=False),
sa.Column('nren', sa.String(), nullable=False),
sa.Column('result', postgresql.JSON(), nullable=False), # type: ignore[no-untyped-call]
sa.Column('created_at', UtcTimestamp(timezone=True), server_default=sa.text('current_timestamp'), nullable=False),
sa.Column('updated_at', UtcTimestamp(timezone=True), server_default=sa.text('current_timestamp'),
nullable=False, onupdate=sa.text('current_timestamp')),
sa.PrimaryKeyConstraint('pre_check_id'),
)
# indexes for faster lookup
op.create_index('ix_bgp_status_pre_checks_router_fqdn', 'bgp_status_pre_checks', ['router_fqdn'])
def downgrade() -> None:
# drop indexes, then table
op.drop_index('ix_bgp_status_pre_checks_router_fqdn', table_name='bgp_status_pre_checks')
op.drop_index('ix_bgp_status_pre_checks_router_id', table_name='bgp_status_pre_checks')
op.drop_table('bgp_status_pre_checks')
"""This module defines types used for precheck operations."""
from enum import StrEnum
from uuid import UUID
from pydantic import BaseModel
class JobStatus(StrEnum):
"""Enumeration of possible job statuses."""
SUCCESSFUL = "successful"
FAILED = "failed"
class ExecutionResult(BaseModel):
"""Model for capturing the result of an executable run."""
output: str
return_code: int
status: JobStatus
class ExecutableRunResponse(BaseModel):
"""Response for running an arbitrary executable."""
job_id: UUID
result: ExecutionResult | None = None
import json
import click
import httpx
import pytest
from orchestrator.db import db
from typer.testing import CliRunner
from gso.cli.prechecks import app
from gso.db.models import BgpStatusPreCheckTable
runner = CliRunner()
# A valid LSO response payload
BASE_RESPONSE = {
"job_id": "2c19843b-c721-4662-8014-b1a7a22f1734",
"result": {
"output": json.dumps({"bgp": {"asn": 65001}, "neighbors": []}),
"return_code": 0,
"status": "successful",
},
}
class DummyResponse:
def __init__(self, json_data, status=200):
self._json = json_data
self.status_code = status
def json(self):
return self._json
def raise_for_status(self):
if not (200 <= self.status_code < 300):
raise httpx.HTTPStatusError("error", request=None, response=self) # noqa: RUF100,EM101
@pytest.fixture()
def mock_http_success(monkeypatch):
"""Return a successful dummy response for httpx.post."""
dummy_resp = DummyResponse(BASE_RESPONSE)
monkeypatch.setattr("httpx.post", lambda *args, **kwargs: dummy_resp) # noqa: ARG005
return dummy_resp
@pytest.fixture()
def mock_http_error(monkeypatch):
"""Simulate httpx.post throwing an exception."""
def raise_exc(*args, **kwargs):
"""Raise an exception to simulate a network error."""
msg = "timeout"
raise Exception(msg) # noqa: TRY002
monkeypatch.setattr("httpx.post", raise_exc)
return raise_exc
@pytest.fixture()
def mock_http_bad_shape(monkeypatch):
"""Return JSON that does not fit ExecutableRunResponse."""
bad = {"unexpected": "data"}
# create a DummyResponse and patch httpx.post
dummy_resp = DummyResponse(bad)
monkeypatch.setattr("httpx.post", lambda *args, **kwargs: dummy_resp) # noqa: ARG005
return dummy_resp
def test_no_save_leaves_table_empty(mock_http_success):
"""If user declines save, table remains empty."""
result = runner.invoke(app, ["rt1.example.com", "SURF"], input="n\n")
assert result.exit_code == 0
assert "not saving" in result.stdout.lower()
# Table should be empty
assert db.session.query(BgpStatusPreCheckTable).count() == 0
def test_prompt_save_yes_persists_record(mock_http_success):
"""Typing 'y' at prompt should also persist."""
result = runner.invoke(app, ["rt1.example.com", "SURF"], input="y\n")
assert result.exit_code == 0
assert db.session.query(BgpStatusPreCheckTable).count() == 1
def test_http_failure_aborts(mock_http_error):
"""Network/timeout errors should abort with exit code 1."""
result = runner.invoke(app, ["rt1.example.com", "SURF"])
assert result.exit_code == 1
# Now stderr is separately captured:
assert "error: failed to call lso: timeout" in result.stdout.lower()
# Table still empty
assert db.session.query(BgpStatusPreCheckTable).count() == 0
def test_invalid_shape_aborts(mock_http_bad_shape):
"""Malformed top-level JSON shape should abort."""
result = runner.invoke(app, ["rt1.example.com", "SURF"])
assert result.exit_code == 1
assert "invalid JSON returned by LSO" in result.stdout
assert db.session.query(BgpStatusPreCheckTable).count() == 0
def test_parse_output_nonjson(mock_http_success):
"""If output is not valid JSON, we still complete without saving."""
# Patch BASE_RESPONSE to use non-JSON output
bad = dict(BASE_RESPONSE)
bad["result"] = dict(bad["result"])
bad["result"]["output"] = "not a json"
# monkeypatch
import httpx as _httpx
_orig = _httpx.post
_httpx.post = lambda *args, **kwargs: DummyResponse(bad) # noqa: ARG005
try:
result = runner.invoke(app, ["rt1.example.com", "SURF"], input="n\n")
assert result.exit_code == 0
assert "(not valid JSON, raw string below)" in result.stdout
finally:
_httpx.post = _orig
def test_pagination_on_large_output(mock_http_success, monkeypatch):
"""Parsed output >50 lines should trigger click.echo_via_pager."""
# Build huge object
big = {"x": ["line"] * 100}
payload = dict(BASE_RESPONSE)
payload["result"] = dict(payload["result"])
payload["result"]["output"] = json.dumps(big)
monkeypatch.setattr("httpx.post", lambda *args, **kwargs: DummyResponse(payload)) # noqa: ARG005
paged = False
def fake_pager(text):
nonlocal paged
paged = True
monkeypatch.setattr(click, "echo_via_pager", fake_pager)
result = runner.invoke(app, ["rt1.example.com", "SURF"], input="n\n")
assert result.exit_code == 0
assert paged, "Expected parsed-output pager for large JSON"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment