def list_job_events_handler()

in backend/lambdas/jobs/handlers.py [0:0]


def list_job_events_handler(event, context):
    # Input parsing
    job_id = event["pathParameters"]["job_id"]
    qs = event.get("queryStringParameters")
    mvqs = event.get("multiValueQueryStringParameters")
    if not qs:
        qs = {}
        mvqs = {}
    page_size = int(qs.get("page_size", 20))
    start_at = qs.get("start_at", "0")
    # Check the job exists
    job = table.get_item(Key={"Id": job_id, "Sk": job_id,}).get("Item")
    if not job:
        return {"statusCode": 404}

    watermark_boundary_mu = (job.get("JobFinishTime", utc_timestamp()) + 1) * 1000

    # Check the watermark is not "future"
    if int(start_at.split("#")[0]) > watermark_boundary_mu:
        raise ValueError("Watermark {} is out of bounds for this job".format(start_at))

    # Apply filters
    filter_expression = Attr("Type").eq("JobEvent")
    user_filters = mvqs.get("filter", [])
    for f in user_filters:
        k, v = f.split("=")
        filter_expression = filter_expression & Attr(k).begins_with(v)

    # Because result may contain both JobEvent and Job items, we request max page_size+1 items then apply the type
    # filter as FilterExpression. We then limit the list size to the requested page size in case the number of
    # items after filtering is still page_size+1 i.e. the Job item wasn't on the page.
    items = []
    query_start_key = str(start_at)
    last_evaluated = None
    last_query_size = 0
    while len(items) < page_size:
        resp = table.query(
            KeyConditionExpression=Key("Id").eq(job_id),
            ScanIndexForward=True,
            FilterExpression=filter_expression,
            Limit=100 if len(user_filters) else page_size + 1,
            ExclusiveStartKey={"Id": job_id, "Sk": query_start_key},
        )
        results = resp.get("Items", [])
        last_query_size = len(results)
        items.extend(results[: page_size - len(items)])
        query_start_key = resp.get("LastEvaluatedKey", {}).get("Sk")
        if not query_start_key:
            break
        last_evaluated = query_start_key

    next_start = _get_watermark(
        items, start_at, page_size, job["JobStatus"], last_evaluated, last_query_size
    )

    resp = {
        k: v
        for k, v in {"JobEvents": items, "NextStart": next_start}.items()
        if v is not None
    }

    return {"statusCode": 200, "body": json.dumps(resp, cls=DecimalEncoder)}