from orchestrator.forms import FormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target # from orchestrator.types import SubscriptionLifecycle from orchestrator.types import InputForm, UUIDstr from orchestrator.workflow import done, init, step, workflow # from orchestrator.workflows.steps import ( # resync, # set_status, # store_process_subscription, # unsync, # ) from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.device import Device def initial_input_form_generator( subscription_id: UUIDstr, organisation: UUIDstr ) -> InputForm: subscription = Device.from_subscription(subscription_id) class TerminateForm(FormPage): are_you_sure: Label = ( f"Are you sure you want to get facts from \ {subscription.description}?" ) return TerminateForm @step("Get facts") def get_facts(subscription_id) -> None: subscription = Device.from_subscription(subscription_id) import ansible_runner r = ansible_runner.run( private_data_dir="/opt", playbook="get_facts.yaml", inventory=subscription.device.fqdn, ) out = r.stdout.read() out_splitted = out.splitlines() return {"output": out_splitted} @workflow( "Get Facts from Device", initial_input_form=wrap_modify_initial_input_form( initial_input_form_generator), target=Target.SYSTEM, ) def get_facts_from_device(): return ( init >> get_facts # >> resync >> done )