diff --git a/gso/__init__.py b/gso/__init__.py
index 99f1d386081a807a5c7b34527ba890cafe7aab1e..beabac68199a877aecf491bab7d6325a34097714 100644
--- a/gso/__init__.py
+++ b/gso/__init__.py
@@ -65,11 +65,12 @@ def init_gso_app() -> OrchestratorCore:
def init_cli_app() -> typer.Typer:
"""Initialise GSO as a CLI application."""
- from gso.cli import imports, netbox, schedule # noqa: PLC0415
+ from gso.cli import imports, netbox, prechecks, schedule # noqa: PLC0415
cli_app.add_typer(imports.app, name="import-cli")
cli_app.add_typer(netbox.app, name="netbox-cli")
cli_app.add_typer(schedule.app, name="schedule-cli")
+ cli_app.add_typer(prechecks.app, name="precheck-cli")
return cli_app()
diff --git a/gso/cli/prechecks.py b/gso/cli/prechecks.py
new file mode 100644
index 0000000000000000000000000000000000000000..55f5df698fde5462159ccedca9f8a41152d8ea48
--- /dev/null
+++ b/gso/cli/prechecks.py
@@ -0,0 +1,107 @@
+#!/usr/bin/env python3
+"""CLI for GSO pre-check using LSO remote exec endpoint."""
+
+import json
+import logging
+
+import click
+import httpx
+import structlog
+import typer
+from orchestrator.db import db
+from orchestrator.db.database import transactional
+from pydantic import ValidationError
+
+from gso import settings
+from gso.db.models import BgpStatusPreCheckTable
+from gso.utils.types.precheck import ExecutableRunResponse
+
+logger = structlog.get_logger(__name__)
+app = typer.Typer()
+
+MAX_OUTPUT_LINES = 50 # Max lines to display before paging
+
+
+@app.command()
+def bgp_status(
+ host: str = typer.Argument(..., help="FQDN of the router to pre-check"),
+ nren: str = typer.Argument(..., help="NREN name for import file path"),
+) -> None:
+ """Trigger the bgp_status_pre-check script on LSO, wait for it to finish.
+
+ pretty-print the JSON result inline,
+ parse the `output` field as JSON-string and page it if large,
+ and optionally save to the database.
+ """
+ oss = settings.load_oss_params()
+ p = oss.PROVISIONING_PROXY
+ payload = {
+ "executable_name": "bgp_status_pre_check.py",
+ "args": [host, nren],
+ "is_async": False,
+ }
+ url = f"{p.scheme}://{p.api_base}/api/execute/"
+
+ # 1) Call LSO
+ try:
+ resp = httpx.post(url, json=payload, timeout=30)
+ resp.raise_for_status()
+ except Exception as e:
+ logger.exception("LSO call failed: %s")
+ typer.echo(f"Error: failed to call LSO: {e}", err=True)
+ raise typer.Exit(1) from e
+
+ # 2) Validate response
+ try:
+ runner = ExecutableRunResponse(**resp.json())
+ except ValidationError as e:
+ logger.exception("Invalid response from LSO")
+ typer.echo("Error: invalid JSON returned by LSO:", err=True)
+ typer.echo(str(e), err=True)
+ raise typer.Exit(1) from e
+
+ # 3) Print full response inline
+ full = runner.model_dump(mode="json")
+ typer.echo(typer.style("\nFull LSO response:", fg=typer.colors.GREEN))
+ typer.echo(json.dumps(full, indent=2))
+
+ # 4) Parse and pretty-print the `output` field, with pagination if large
+ output_str = runner.result.output if runner.result else ""
+ typer.echo(typer.style("\nParsed `result.output` as JSON:", fg=typer.colors.CYAN))
+ try:
+ parsed = json.loads(output_str)
+ parsed_text = json.dumps(parsed, indent=2)
+ if parsed_text.count("\n") > MAX_OUTPUT_LINES:
+ click.echo_via_pager(parsed_text)
+ else:
+ typer.echo(parsed_text)
+ except json.JSONDecodeError:
+ typer.echo("(not valid JSON, raw string below)")
+ typer.echo(output_str)
+
+ # 5) Save?
+ confirm_msg = (
+ f"\nIf you are happy with the above output for router '{host}' (NREN: {nren}), "
+ "shall we save it to the database?"
+ )
+ if typer.confirm(confirm_msg, default=False):
+ try:
+ with db.database_scope(), transactional(db, logger):
+ record = BgpStatusPreCheckTable(
+ router_fqdn=host,
+ nren=nren,
+ result=runner.result.model_dump(mode="json") if runner.result else {},
+ )
+ db.session.add(record)
+ except Exception as err:
+ logger.exception("Failed to save pre-check record")
+ typer.echo("Error: could not save pre-check to database.", err=True)
+ raise typer.Exit(2) from err
+ typer.echo("Pre-check result saved.")
+ else:
+ typer.echo("Alright, not saving. You can re-run when ready.")
+
+
+if __name__ == "__main__":
+ logging.basicConfig(level=logging.INFO)
+ app()
diff --git a/gso/db/models.py b/gso/db/models.py
index c6382b1c81d06f9192ed4f416e186a2d990a5a45..5ac6a46afea88e5cc3ed96b2d9c8716ce7232834 100644
--- a/gso/db/models.py
+++ b/gso/db/models.py
@@ -4,6 +4,7 @@ import structlog
from orchestrator.db import UtcTimestamp
from orchestrator.db.database import BaseModel
from sqlalchemy import (
+ JSON,
String,
text,
)
@@ -25,3 +26,43 @@ class PartnerTable(BaseModel):
updated_at = mapped_column(
UtcTimestamp, server_default=text("current_timestamp"), nullable=False, onupdate=text("current_timestamp")
)
+
+
+class BgpStatusPreCheckTable(BaseModel):
+ """Database table for storing per router BGP satus precheck results."""
+
+ __tablename__ = "bgp_status_pre_checks"
+
+ pre_check_id = mapped_column(
+ String,
+ server_default=text("uuid_generate_v4"),
+ primary_key=True,
+ )
+ router_fqdn = mapped_column(
+ String,
+ nullable=False,
+ index=True,
+ comment="The FQDN of the router under check",
+ )
+ nren = mapped_column(
+ String,
+ nullable=False,
+ comment="Name of the NREN (used in import file path)",
+ )
+ result = mapped_column(
+ JSON,
+ nullable=False,
+ comment="Raw JSON blob returned by LSO bgp_status_pre_check script",
+ )
+
+ created_at = mapped_column(
+ UtcTimestamp,
+ server_default=text("current_timestamp"),
+ nullable=False,
+ )
+ updated_at = mapped_column(
+ UtcTimestamp,
+ server_default=text("current_timestamp"),
+ nullable=False,
+ onupdate=text("current_timestamp"),
+ )
diff --git a/gso/migrations/env.py b/gso/migrations/env.py
index fb535946527dd0a487e75c52d3f6856b9f4eda34..198b6499028754c353efc9e76c1fb7eaa7822bc4 100644
--- a/gso/migrations/env.py
+++ b/gso/migrations/env.py
@@ -5,7 +5,7 @@ from orchestrator.db.database import BaseModel
from orchestrator.settings import app_settings
from sqlalchemy import engine_from_config, pool, text
-from gso.db.models import PartnerTable # noqa: F401
+from gso.db.models import PartnerTable, BgpStatusPreCheckTable # noqa: F401
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
diff --git a/gso/migrations/versions/2025-06-27_27308f1dd850_add_bgp_satatus_pre_check_table_.py b/gso/migrations/versions/2025-06-27_27308f1dd850_add_bgp_satatus_pre_check_table_.py
new file mode 100644
index 0000000000000000000000000000000000000000..91503c32a88d06a788efb9a18ec293b8bb29ee19
--- /dev/null
+++ b/gso/migrations/versions/2025-06-27_27308f1dd850_add_bgp_satatus_pre_check_table_.py
@@ -0,0 +1,40 @@
+"""Add bgp_status_pre_checks table.
+
+Revision ID: 27308f1dd850
+Revises: 24858fd1d805
+Create Date: 2025-06-27 10:00:00.000000
+
+"""
+import sqlalchemy as sa
+from alembic import op
+from orchestrator.db import UtcTimestamp
+from sqlalchemy.dialects import postgresql
+
+# revision identifiers, used by Alembic.
+revision = '27308f1dd850'
+down_revision = '24858fd1d805'
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ 'bgp_status_pre_checks',
+ sa.Column('pre_check_id', sa.String(), server_default=sa.text('uuid_generate_v4()'), nullable=False),
+ sa.Column('router_fqdn', sa.String(), nullable=False),
+ sa.Column('nren', sa.String(), nullable=False),
+ sa.Column('result', postgresql.JSON(), nullable=False), # type: ignore[no-untyped-call]
+ sa.Column('created_at', UtcTimestamp(timezone=True), server_default=sa.text('current_timestamp'), nullable=False),
+ sa.Column('updated_at', UtcTimestamp(timezone=True), server_default=sa.text('current_timestamp'),
+ nullable=False, onupdate=sa.text('current_timestamp')),
+ sa.PrimaryKeyConstraint('pre_check_id'),
+ )
+ # indexes for faster lookup
+ op.create_index('ix_bgp_status_pre_checks_router_fqdn', 'bgp_status_pre_checks', ['router_fqdn'])
+
+
+def downgrade() -> None:
+ # drop indexes, then table
+ op.drop_index('ix_bgp_status_pre_checks_router_fqdn', table_name='bgp_status_pre_checks')
+ op.drop_index('ix_bgp_status_pre_checks_router_id', table_name='bgp_status_pre_checks')
+ op.drop_table('bgp_status_pre_checks')
diff --git a/gso/utils/types/precheck.py b/gso/utils/types/precheck.py
new file mode 100644
index 0000000000000000000000000000000000000000..db430fcb1a5f6d689947c2e9e4bb6bf1f4ebe55e
--- /dev/null
+++ b/gso/utils/types/precheck.py
@@ -0,0 +1,28 @@
+"""This module defines types used for precheck operations."""
+
+from enum import StrEnum
+from uuid import UUID
+
+from pydantic import BaseModel
+
+
+class JobStatus(StrEnum):
+ """Enumeration of possible job statuses."""
+
+ SUCCESSFUL = "successful"
+ FAILED = "failed"
+
+
+class ExecutionResult(BaseModel):
+ """Model for capturing the result of an executable run."""
+
+ output: str
+ return_code: int
+ status: JobStatus
+
+
+class ExecutableRunResponse(BaseModel):
+ """Response for running an arbitrary executable."""
+
+ job_id: UUID
+ result: ExecutionResult | None = None
diff --git a/test/cli/test_pre_checks.py b/test/cli/test_pre_checks.py
new file mode 100644
index 0000000000000000000000000000000000000000..a3d95f65fd4a6773bd6045a9f28e469ae89dd153
--- /dev/null
+++ b/test/cli/test_pre_checks.py
@@ -0,0 +1,141 @@
+import json
+
+import click
+import httpx
+import pytest
+from orchestrator.db import db
+from typer.testing import CliRunner
+
+from gso.cli.prechecks import app
+from gso.db.models import BgpStatusPreCheckTable
+
+runner = CliRunner()
+
+# A valid LSO response payload
+BASE_RESPONSE = {
+ "job_id": "2c19843b-c721-4662-8014-b1a7a22f1734",
+ "result": {
+ "output": json.dumps({"bgp": {"asn": 65001}, "neighbors": []}),
+ "return_code": 0,
+ "status": "successful",
+ },
+}
+
+
+class DummyResponse:
+ def __init__(self, json_data, status=200):
+ self._json = json_data
+ self.status_code = status
+
+ def json(self):
+ return self._json
+
+ def raise_for_status(self):
+ if not (200 <= self.status_code < 300):
+ raise httpx.HTTPStatusError("error", request=None, response=self) # noqa: RUF100,EM101
+
+
+@pytest.fixture()
+def mock_http_success(monkeypatch):
+ """Return a successful dummy response for httpx.post."""
+ dummy_resp = DummyResponse(BASE_RESPONSE)
+ monkeypatch.setattr("httpx.post", lambda *args, **kwargs: dummy_resp) # noqa: ARG005
+ return dummy_resp
+
+
+@pytest.fixture()
+def mock_http_error(monkeypatch):
+ """Simulate httpx.post throwing an exception."""
+
+ def raise_exc(*args, **kwargs):
+ """Raise an exception to simulate a network error."""
+ msg = "timeout"
+ raise Exception(msg) # noqa: TRY002
+
+ monkeypatch.setattr("httpx.post", raise_exc)
+ return raise_exc
+
+
+@pytest.fixture()
+def mock_http_bad_shape(monkeypatch):
+ """Return JSON that does not fit ExecutableRunResponse."""
+ bad = {"unexpected": "data"}
+ # create a DummyResponse and patch httpx.post
+ dummy_resp = DummyResponse(bad)
+ monkeypatch.setattr("httpx.post", lambda *args, **kwargs: dummy_resp) # noqa: ARG005
+ return dummy_resp
+
+
+def test_no_save_leaves_table_empty(mock_http_success):
+ """If user declines save, table remains empty."""
+ result = runner.invoke(app, ["rt1.example.com", "SURF"], input="n\n")
+ assert result.exit_code == 0
+ assert "not saving" in result.stdout.lower()
+ # Table should be empty
+ assert db.session.query(BgpStatusPreCheckTable).count() == 0
+
+
+def test_prompt_save_yes_persists_record(mock_http_success):
+ """Typing 'y' at prompt should also persist."""
+ result = runner.invoke(app, ["rt1.example.com", "SURF"], input="y\n")
+ assert result.exit_code == 0
+ assert db.session.query(BgpStatusPreCheckTable).count() == 1
+
+
+def test_http_failure_aborts(mock_http_error):
+ """Network/timeout errors should abort with exit code 1."""
+ result = runner.invoke(app, ["rt1.example.com", "SURF"])
+ assert result.exit_code == 1
+ # Now stderr is separately captured:
+ assert "error: failed to call lso: timeout" in result.stdout.lower()
+
+ # Table still empty
+ assert db.session.query(BgpStatusPreCheckTable).count() == 0
+
+
+def test_invalid_shape_aborts(mock_http_bad_shape):
+ """Malformed top-level JSON shape should abort."""
+ result = runner.invoke(app, ["rt1.example.com", "SURF"])
+ assert result.exit_code == 1
+ assert "invalid JSON returned by LSO" in result.stdout
+ assert db.session.query(BgpStatusPreCheckTable).count() == 0
+
+
+def test_parse_output_nonjson(mock_http_success):
+ """If output is not valid JSON, we still complete without saving."""
+ # Patch BASE_RESPONSE to use non-JSON output
+ bad = dict(BASE_RESPONSE)
+ bad["result"] = dict(bad["result"])
+ bad["result"]["output"] = "not a json"
+ # monkeypatch
+ import httpx as _httpx
+
+ _orig = _httpx.post
+ _httpx.post = lambda *args, **kwargs: DummyResponse(bad) # noqa: ARG005
+ try:
+ result = runner.invoke(app, ["rt1.example.com", "SURF"], input="n\n")
+ assert result.exit_code == 0
+ assert "(not valid JSON, raw string below)" in result.stdout
+ finally:
+ _httpx.post = _orig
+
+
+def test_pagination_on_large_output(mock_http_success, monkeypatch):
+ """Parsed output >50 lines should trigger click.echo_via_pager."""
+ # Build huge object
+ big = {"x": ["line"] * 100}
+ payload = dict(BASE_RESPONSE)
+ payload["result"] = dict(payload["result"])
+ payload["result"]["output"] = json.dumps(big)
+ monkeypatch.setattr("httpx.post", lambda *args, **kwargs: DummyResponse(payload)) # noqa: ARG005
+
+ paged = False
+
+ def fake_pager(text):
+ nonlocal paged
+ paged = True
+
+ monkeypatch.setattr(click, "echo_via_pager", fake_pager)
+ result = runner.invoke(app, ["rt1.example.com", "SURF"], input="n\n")
+ assert result.exit_code == 0
+ assert paged, "Expected parsed-output pager for large JSON"