in backend/lambdas/jobs/stream_processor.py [0:0]
def process_job(job):
job_id = job["Id"]
state = {
k: job[k]
for k in [
"AthenaConcurrencyLimit",
"AthenaQueryMaxRetries",
"DeletionTasksMaxNumber",
"ForgetQueueWaitSeconds",
"Id",
"QueryExecutionWaitSeconds",
"QueryQueueWaitSeconds",
]
}
try:
client.start_execution(
stateMachineArn=state_machine_arn,
name=job_id,
input=json.dumps(state, cls=DecimalEncoder),
)
except client.exceptions.ExecutionAlreadyExists:
logger.warning("Execution %s already exists", job_id)
except (ClientError, ValueError) as e:
emit_event(
job_id,
"Exception",
{
"Error": "ExecutionFailure",
"Cause": "Unable to start StepFunction execution: {}".format(str(e)),
},
"StreamProcessor",
)