in src/stepfunctions/workflow/stepfunctions.py [0:0]
def execute(self, name=None, inputs=None):
"""
Starts a single execution of the workflow.
Args:
name (str, optional): The name of the workflow execution. If one is not provided, a workflow execution name will be auto-generated. (default: None)
inputs (str, list or dict, optional): Input data for the workflow execution. (default: None)
Returns:
stepfunctions.workflow.Execution: An execution instance of the workflow.
"""
if self.workflow_input:
validation_result = self.workflow_input.validate(inputs)
if validation_result.valid is False:
raise ValueError("Expected run input with the schema: {}".format(self.workflow_input.get_schema_as_json()))
if self.state_machine_arn is None:
raise WorkflowNotFound("Local workflow instance does not point to an existing workflow on AWS StepFunctions. Before executing a workflow, call Workflow.create(...) or Workflow.attach(...).")
params = {
'stateMachineArn': self.state_machine_arn
}
if name is not None:
params['name'] = name
if inputs is not None:
if self.format_json:
params['input'] = json.dumps(inputs, indent=4)
else:
params['input'] = json.dumps(inputs)
response = self.client.start_execution(**params)
logger.info("Workflow execution started successfully on AWS Step Functions.")
# name is None because boto3 client.start_execution only returns startDate and executionArn
return Execution(
workflow=self,
execution_arn=response['executionArn'],
start_date=response['startDate'],
status=ExecutionStatus.Running,
client=self.client
)