def process_labels()

in dags/processing/processing.py [0:0]


def process_labels(file_labels, bucket, key, table_target):
    import boto3
    import json
    import logging
    from botocore.exceptions import ClientError
    import io
    from datetime import datetime, timedelta
    import re

    s3 = boto3.client("s3")
    dynamo = boto3.client("dynamodb")

    # TODO: pull from secrets manager
    frame_duration = 67

    path_elems = key.split("/")
    file_elems = path_elems[-1].split(".")

    file = f"{file_elems[0]}.json"
    f = io.BytesIO(json.dumps(file_labels).encode())
    upload_path = "/".join(path_elems[0:-1])
    upload_key = f"{upload_path}/{file}"
    logging.info(f"Uploading {bucket}/{upload_key}")
    s3.upload_fileobj(f, bucket, upload_key)

    # calculate the absolute timestamp of this frame based on the prefix name and
    # the file index * the duration per frame
    camera = re.match("[A-Za-z]*", file_elems[0]).group(0)
    file_offset = int(re.search("[0-9]{4}", file_elems[0]).group(0))
    print(f"file offset:{file_offset}")

    # Extract the base time for the .bag file from the S3 prefix
    bt_elems = path_elems[-2].split("_")
    bt_elems = bt_elems[0].split("-")
    frame_time = datetime(*[int(x) for x in bt_elems[0:6]])
    print(f"frame time: {frame_time}")

    # now adjust for the file offset in the file name (e.g. front<offset>.png) as well as
    # the timestamp relative to the the start of the mp4 file
    td = timedelta(milliseconds=(file_offset * frame_duration))
    print(f"td:{td}")
    frame_time = frame_time + td
    print(f"ft_iso: {frame_time.isoformat()}")


    db_key = {"timestamp": {"S": frame_time.isoformat()}, "camera": {"S": camera}}

    item = {
        "timestamp": {"S": frame_time.isoformat()},
        "camera": {"S": camera},
        "s3_loc": {"S": key},
    }

    # Put the entry into the table
    dynamo.put_item(TableName=table_target, Item=item)

    # Upated the table with each detection
    ped_cnt = 0
    bike_cnt = 0
    motorbike_cnt = 0
    for l in file_labels:
        # Keep a count of the number of people/bikes/motorbikes in the image
        name = l["Name"].replace(" ", "_")
        # Ignore items where there's no bounding box
        if "Instances" in l:
            count = len(l["Instances"])
            if count == 0:
                continue
            if name == "Person":
                print(name)
                ped_cnt = ped_cnt + count
            elif name == "Bicycle":
                print(name)
                bike_cnt = bike_cnt + count
            elif name == "Motorcycle":
                print(name)
                motorbike_cnt = motorbike_cnt + count

        update_expression = f"SET {name} = :conf"
        condition_expression = f"attribute_not_exists({name}) OR {name} < :conf"

        #
        try:
            dynamo.update_item(
                TableName=table_target,
                Key=db_key,
                UpdateExpression=update_expression,
                ConditionExpression=condition_expression,
                ExpressionAttributeValues={":conf": {"N": f'{l["Confidence"]}'}},
            )
        except ClientError as e:
            logging.warning(e)

    try:
        dynamo.update_item(
            TableName=table_target,
            Key=db_key,
            UpdateExpression=f"SET Ped_Count = :peds, Bike_Count  = :bikes, Motorbike_Count = :motorbikes ",
            ExpressionAttributeValues={
                ":peds": {"N": f"{ped_cnt}"},
                ":bikes": {"N": f"{bike_cnt}"},
                ":motorbikes": {"N": f"{motorbike_cnt}"},
            },
        )
    except ClientError as e:
        logging.warning(e)