def handler()

in pipeline/ocr/sfn_semaphore/fn-acquire-lock/main.py [0:0]


def handler(event, context):
    logger.info("Received event: %s", event)
    event = AcquireLockEvent(event)

    # time.time() is only guaranteed second precision, we'd like ms or better for multi-TPS:
    current_timestamp = datetime.now().timestamp()
    ddb_table = ddb.Table(event.ddb_table_name)
    ddb_key = {event.lock_id_attr: event.lock_name}
    try:
        resp = ddb_table.update_item(
            Key=ddb_key,
            ReturnValues="UPDATED_NEW",
            ConditionExpression=" and ".join(
                (
                    "currentlockcount < :limit",
                    "attribute_not_exists(#lockownerid)",
                    "nextavailtime < :currenttime",
                )
            ),
            ExpressionAttributeNames={
                "#currentlockcount": "currentlockcount",
                "#lockownerid": event.execution_id,
                "#nextavailtime": "nextavailtime",
            },
            ExpressionAttributeValues={
                ":increase": event.per_item_concurrency,
                ":limit": event.concurrency_limit,
                ":currenttime": Decimal(current_timestamp),
                ":nextavailtime": Decimal(
                    current_timestamp + (1.0 / event.warmup_tps_limit)
                    if event.warmup_tps_limit
                    else 0
                ),
                ":lockacquiredtime": event.state_entered_time,
            },
            UpdateExpression=" ".join(
                (
                    "SET",
                    "#currentlockcount = #currentlockcount + :increase,"
                    "#lockownerid = :lockacquiredtime,",
                    "#nextavailtime = :nextavailtime",
                )
            ),
        )
    except ddbclient.exceptions.ConditionalCheckFailedException as err_conditional:
        logger.error("Conditional check failed")
        # Because of the nature of the checks enforced by this acquirer (which yields this error
        # even if the lock item does not yet exist), and the fact that we can't yet catch
        # non-existent lock items in the next state because they trigger States.Runtime errors;
        # we'll perform a quick getItem check here and raise a different error if the item is
        # missing:
        try:
            get_resp = ddb_table.get_item(
                Key=ddb_key,
                ProjectionExpression=next(k for k in ddb_key),  # Just the (first) key attribute
                ReturnConsumedCapacity="NONE",
            )
        except Exception:
            logger.exception("Additional exception while checking for lock item existence")
            get_resp = {"Error": "Couldn't check item exists"}

        if not (get_resp and get_resp.get("Item")):
            raise Lambda_DynamoDBResourceNotFound("Lock item does not exist") from err_conditional
        else:
            raise Lambda_ConditionalCheckFailedException(str(err_conditional)) from err_conditional
    except (
        ddbclient.exceptions.ProvisionedThroughputExceededException,
        ddbclient.exceptions.RequestLimitExceeded,
        ddbclient.exceptions.ThrottlingException,
    ) as err_throttle:
        logger.error("DynamoDB throttled")
        raise Lambda_DynamoDBThrottlingException(str(err_throttle)) from err_throttle
    except Exception as err_general:
        logger.error("Other DynamoDB error")
        raise Lambda_OtherDynamoDBError(str(err_general)) from err_general

    logger.info("DDB Response: %s", resp)
    # resp["Attributes"] is a pre-parsed dict direct from attr name to value, including the updated
    # keys. Since some of these are Decimals, Decimals can't be JSON dumped, and the output of this
    # Lambda step is discarded anyway, we'll just return an empty object.
    return {}