in pipeline/ocr/sfn_semaphore/fn-acquire-lock/main.py [0:0]
def handler(event, context):
logger.info("Received event: %s", event)
event = AcquireLockEvent(event)
# time.time() is only guaranteed second precision, we'd like ms or better for multi-TPS:
current_timestamp = datetime.now().timestamp()
ddb_table = ddb.Table(event.ddb_table_name)
ddb_key = {event.lock_id_attr: event.lock_name}
try:
resp = ddb_table.update_item(
Key=ddb_key,
ReturnValues="UPDATED_NEW",
ConditionExpression=" and ".join(
(
"currentlockcount < :limit",
"attribute_not_exists(#lockownerid)",
"nextavailtime < :currenttime",
)
),
ExpressionAttributeNames={
"#currentlockcount": "currentlockcount",
"#lockownerid": event.execution_id,
"#nextavailtime": "nextavailtime",
},
ExpressionAttributeValues={
":increase": event.per_item_concurrency,
":limit": event.concurrency_limit,
":currenttime": Decimal(current_timestamp),
":nextavailtime": Decimal(
current_timestamp + (1.0 / event.warmup_tps_limit)
if event.warmup_tps_limit
else 0
),
":lockacquiredtime": event.state_entered_time,
},
UpdateExpression=" ".join(
(
"SET",
"#currentlockcount = #currentlockcount + :increase,"
"#lockownerid = :lockacquiredtime,",
"#nextavailtime = :nextavailtime",
)
),
)
except ddbclient.exceptions.ConditionalCheckFailedException as err_conditional:
logger.error("Conditional check failed")
# Because of the nature of the checks enforced by this acquirer (which yields this error
# even if the lock item does not yet exist), and the fact that we can't yet catch
# non-existent lock items in the next state because they trigger States.Runtime errors;
# we'll perform a quick getItem check here and raise a different error if the item is
# missing:
try:
get_resp = ddb_table.get_item(
Key=ddb_key,
ProjectionExpression=next(k for k in ddb_key), # Just the (first) key attribute
ReturnConsumedCapacity="NONE",
)
except Exception:
logger.exception("Additional exception while checking for lock item existence")
get_resp = {"Error": "Couldn't check item exists"}
if not (get_resp and get_resp.get("Item")):
raise Lambda_DynamoDBResourceNotFound("Lock item does not exist") from err_conditional
else:
raise Lambda_ConditionalCheckFailedException(str(err_conditional)) from err_conditional
except (
ddbclient.exceptions.ProvisionedThroughputExceededException,
ddbclient.exceptions.RequestLimitExceeded,
ddbclient.exceptions.ThrottlingException,
) as err_throttle:
logger.error("DynamoDB throttled")
raise Lambda_DynamoDBThrottlingException(str(err_throttle)) from err_throttle
except Exception as err_general:
logger.error("Other DynamoDB error")
raise Lambda_OtherDynamoDBError(str(err_general)) from err_general
logger.info("DDB Response: %s", resp)
# resp["Attributes"] is a pre-parsed dict direct from attr name to value, including the updated
# keys. Since some of these are Decimals, Decimals can't be JSON dumped, and the output of this
# Lambda step is discarded anyway, we'll just return an empty object.
return {}