get_facts.py 1.57 KiB
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
# from orchestrator.types import SubscriptionLifecycle
from orchestrator.types import InputForm, UUIDstr
from orchestrator.workflow import done, init, step, workflow
# from orchestrator.workflows.steps import (
# resync,
# set_status,
# store_process_subscription,
# unsync,
# )
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.device import Device
def initial_input_form_generator(
subscription_id: UUIDstr, organisation: UUIDstr
) -> InputForm:
subscription = Device.from_subscription(subscription_id)
class TerminateForm(FormPage):
are_you_sure: Label = (
f"Are you sure you want to get facts from \
{subscription.description}?"
)
return TerminateForm
@step("Get facts")
def get_facts(subscription_id) -> None:
subscription = Device.from_subscription(subscription_id)
import ansible_runner
r = ansible_runner.run(
private_data_dir="/opt",
playbook="get_facts.yaml",
inventory=subscription.device.fqdn,
)
out = r.stdout.read()
out_splitted = out.splitlines()
return {"output": out_splitted}
@workflow(
"Get Facts from Device",
initial_input_form=wrap_modify_initial_input_form(
initial_input_form_generator),
target=Target.SYSTEM,
)
def get_facts_from_device():
return (
init
>> get_facts
# >> resync
>> done
)