def lambda_handler()

in backend/shopping-cart-service/migrate_cart.py [0:0]


def lambda_handler(event, context):
    """
    Update cart table to use user identifier instead of anonymous cookie value as a key. This will be called when a user
    is logged in.
    """

    cart_id, _ = get_cart_id(event["headers"])
    try:
        # Because this method is authorized at API gateway layer, we don't need to validate the JWT claims here
        user_id = event["requestContext"]["authorizer"]["claims"]["sub"]
        logger.info("Migrating cart_id %s to user_id %s", cart_id, user_id)
    except KeyError:

        return {
            "statusCode": 400,
            "headers": get_headers(cart_id),
            "body": json.dumps({"message": "Invalid user"}),
        }

    # Get all cart items belonging to the user's anonymous identity
    response = table.query(
        KeyConditionExpression=Key("pk").eq(f"cart#{cart_id}")
        & Key("sk").begins_with("product#")
    )
    unauth_cart = response["Items"]

    # Since there's no batch operation available for updating items, and there's no dependency between them, we can
    # run them in parallel threads.
    thread_list = []

    for item in unauth_cart:
        # Store items with user identifier as pk instead of "unauthenticated" cart ID
        # Using threading library to perform updates in parallel
        ddb_updateitem_thread = threading.Thread(
            target=update_item, args=(user_id, item)
        )
        thread_list.append(ddb_updateitem_thread)
        ddb_updateitem_thread.start()

        # Delete items with unauthenticated cart ID
        # Rather than deleting directly, push to SQS queue to handle asynchronously
        queue.send_message(MessageBody=json.dumps(item, default=handle_decimal_type))

    for ddb_thread in thread_list:
        ddb_thread.join()  # Block main thread until all updates finished

    if unauth_cart:
        metrics.add_metric(name="CartMigrated", unit="Count", value=1)

    response = table.query(
        KeyConditionExpression=Key("pk").eq(f"user#{user_id}")
        & Key("sk").begins_with("product#"),
        ProjectionExpression="sk,quantity,productDetail",
        ConsistentRead=True,  # Perform a strongly consistent read here to ensure we get correct values after updates
    )

    product_list = response.get("Items", [])
    for product in product_list:
        product.update(
            (k, v.replace("product#", "")) for k, v in product.items() if k == "sk"
        )

    return {
        "statusCode": 200,
        "headers": get_headers(cart_id),
        "body": json.dumps({"products": product_list}, default=handle_decimal_type),
    }