def ddb_to_event()

in shared/src/ecom/ecom/eventbridge.py [0:0]


def ddb_to_event(
        ddb_record: dict,
        event_bus_name: str,
        source: str,
        object_type: str,
        resource_key: str
    ) -> dict:
    """
    Transforms a DynamoDB Streams record into an EventBridge event

    For this function to works, you need to have a StreamViewType of
    NEW_AND_OLD_IMAGES.
    """

    event = {
        "Time": datetime.now(),
        "Source": source,
        "Resources": [
            str(deserialize(ddb_record["dynamodb"]["Keys"][resource_key]))
        ],
        "EventBusName": event_bus_name
    }

    # Inject X-Ray trace ID
    trace_id = os.environ.get("_X_AMZN_TRACE_ID", None)
    if trace_id:
        event["TraceHeader"] = trace_id

    # Created event
    if ddb_record["eventName"].upper() == "INSERT":
        event["DetailType"] = "{}Created".format(object_type)
        event["Detail"] = json.dumps({
            k: deserialize(v)
            for k, v
            in ddb_record["dynamodb"]["NewImage"].items()
        }, cls=Encoder)

    # Deleted event
    elif ddb_record["eventName"].upper() == "REMOVE":
        event["DetailType"] = "{}Deleted".format(object_type)
        event["Detail"] = json.dumps({
            k: deserialize(v)
            for k, v
            in ddb_record["dynamodb"]["OldImage"].items()
        }, cls=Encoder)

    elif ddb_record["eventName"].upper() == "MODIFY":
        new = {
            k: deserialize(v)
            for k, v
            in ddb_record["dynamodb"]["NewImage"].items()
        }
        old = {
            k: deserialize(v)
            for k, v
            in ddb_record["dynamodb"]["OldImage"].items()
        }

        # Old keys not in NewImage
        changed = [k for k in old.keys() if k not in new.keys()]
        for k in new.keys():
            # New keys not in OldImage
            if k not in old.keys():
                changed.append(k)
            # New keys that are not equal to old values
            elif new[k] != old[k]:
                changed.append(k)

        event["DetailType"] = "{}Modified".format(object_type)
        event["Detail"] = json.dumps({
            "new": new,
            "old": old,
            "changed": changed
        }, cls=Encoder)

    else:
        raise ValueError("Wrong eventName value for DynamoDB event: {}".format(ddb_record["eventName"]))

    return event