in src/nova_act/types/state/step.py [0:0]
def from_message(cls, message: dict[str, dict]) -> "Step":
# Extract input data
input_data = message.get("input", {})
model_input = ModelInput(
image=input_data.get("screenshot", ""),
prompt=input_data.get("prompt", ""),
active_url=input_data.get("metadata", {}).get("activeURL", ""),
legacy_workflow_run_id=input_data.get("agentRunCreate", {}).get("workflowRunId", ""),
)
# Extract output data
output_data = message.get("output", {})
model_output = ModelOutput(
awl_raw_program=output_data.get("rawProgramBody", ""), request_id=output_data.get("requestId", "")
)
# Extract timing data
observed_time = dt.fromtimestamp(time.time())
return cls(
model_input=model_input,
model_output=model_output,
observed_time=observed_time,
rawMessage=message,
)