in backend/lambdas/jobs/handlers.py [0:0]
def list_job_events_handler(event, context):
# Input parsing
job_id = event["pathParameters"]["job_id"]
qs = event.get("queryStringParameters")
mvqs = event.get("multiValueQueryStringParameters")
if not qs:
qs = {}
mvqs = {}
page_size = int(qs.get("page_size", 20))
start_at = qs.get("start_at", "0")
# Check the job exists
job = table.get_item(Key={"Id": job_id, "Sk": job_id,}).get("Item")
if not job:
return {"statusCode": 404}
watermark_boundary_mu = (job.get("JobFinishTime", utc_timestamp()) + 1) * 1000
# Check the watermark is not "future"
if int(start_at.split("#")[0]) > watermark_boundary_mu:
raise ValueError("Watermark {} is out of bounds for this job".format(start_at))
# Apply filters
filter_expression = Attr("Type").eq("JobEvent")
user_filters = mvqs.get("filter", [])
for f in user_filters:
k, v = f.split("=")
filter_expression = filter_expression & Attr(k).begins_with(v)
# Because result may contain both JobEvent and Job items, we request max page_size+1 items then apply the type
# filter as FilterExpression. We then limit the list size to the requested page size in case the number of
# items after filtering is still page_size+1 i.e. the Job item wasn't on the page.
items = []
query_start_key = str(start_at)
last_evaluated = None
last_query_size = 0
while len(items) < page_size:
resp = table.query(
KeyConditionExpression=Key("Id").eq(job_id),
ScanIndexForward=True,
FilterExpression=filter_expression,
Limit=100 if len(user_filters) else page_size + 1,
ExclusiveStartKey={"Id": job_id, "Sk": query_start_key},
)
results = resp.get("Items", [])
last_query_size = len(results)
items.extend(results[: page_size - len(items)])
query_start_key = resp.get("LastEvaluatedKey", {}).get("Sk")
if not query_start_key:
break
last_evaluated = query_start_key
next_start = _get_watermark(
items, start_at, page_size, job["JobStatus"], last_evaluated, last_query_size
)
resp = {
k: v
for k, v in {"JobEvents": items, "NextStart": next_start}.items()
if v is not None
}
return {"statusCode": 200, "body": json.dumps(resp, cls=DecimalEncoder)}