in src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java [1126:1218]
public void sendHeartbeat(final SendHeartbeatOptions options) {
Objects.requireNonNull(options, "options is required");
Objects.requireNonNull(options.getLockItem(), "Cannot send heartbeat for null lock");
final boolean deleteData = options.getDeleteData() != null && options.getDeleteData();
if (deleteData && options.getData().isPresent()) {
throw new IllegalArgumentException("data must not be present if deleteData is true");
}
long leaseDurationToEnsureInMilliseconds = this.leaseDurationInMilliseconds;
if (options.getLeaseDurationToEnsure() != null) {
Objects.requireNonNull(options.getTimeUnit(), "TimeUnit must not be null if leaseDurationToEnsure is not null");
leaseDurationToEnsureInMilliseconds = options.getTimeUnit().toMillis(options.getLeaseDurationToEnsure());
}
final LockItem lockItem = options.getLockItem();
if (lockItem.isExpired() || !lockItem.getOwnerName().equals(this.ownerName) || lockItem.isReleased()) {
this.locks.remove(lockItem.getUniqueIdentifier());
throw new LockNotGrantedException("Cannot send heartbeat because lock is not granted");
}
synchronized (lockItem) {
//Set up condition for UpdateItem. Basically any changes require:
//1. I own the lock
//2. I know the current version number
//3. The lock already exists (UpdateItem API can cause a new item to be created if you do not condition the primary keys with attribute_exists)
final String conditionalExpression;
final Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
expressionAttributeValues.put(RVN_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().s(lockItem.getRecordVersionNumber()).build());
expressionAttributeValues.put(OWNER_NAME_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().s(lockItem.getOwnerName()).build());
final Map<String, String> expressionAttributeNames = new HashMap<>();
expressionAttributeNames.put(PK_PATH_EXPRESSION_VARIABLE, partitionKeyName);
expressionAttributeNames.put(LEASE_DURATION_PATH_VALUE_EXPRESSION_VARIABLE, LEASE_DURATION);
expressionAttributeNames.put(RVN_PATH_EXPRESSION_VARIABLE, RECORD_VERSION_NUMBER);
expressionAttributeNames.put(OWNER_NAME_PATH_EXPRESSION_VARIABLE, OWNER_NAME);
if (this.sortKeyName.isPresent()) {
conditionalExpression = PK_EXISTS_AND_SK_EXISTS_AND_OWNER_NAME_SAME_AND_RVN_SAME_CONDITION;
expressionAttributeNames.put(SK_PATH_EXPRESSION_VARIABLE, sortKeyName.get());
} else {
conditionalExpression = PK_EXISTS_AND_OWNER_NAME_SAME_AND_RVN_SAME_CONDITION;
}
final String recordVersionNumber = this.generateRecordVersionNumber();
//Set up update expression for UpdateItem.
final String updateExpression;
expressionAttributeValues.put(NEW_RVN_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().s(recordVersionNumber).build());
expressionAttributeValues.put(LEASE_DURATION_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().s(String.valueOf(leaseDurationToEnsureInMilliseconds)).build());
if (deleteData) {
expressionAttributeNames.put(DATA_PATH_EXPRESSION_VARIABLE, DATA);
updateExpression = UPDATE_LEASE_DURATION_AND_RVN_AND_REMOVE_DATA;
} else if (options.getData().isPresent()) {
expressionAttributeNames.put(DATA_PATH_EXPRESSION_VARIABLE, DATA);
expressionAttributeValues.put(DATA_VALUE_EXPRESSION_VARIABLE, AttributeValue.builder().b(SdkBytes.fromByteBuffer(options.getData().get())).build());
updateExpression = UPDATE_LEASE_DURATION_AND_RVN_AND_DATA;
} else {
updateExpression = UPDATE_LEASE_DURATION_AND_RVN;
}
final UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
.tableName(tableName)
.key(getItemKeys(lockItem))
.conditionExpression(conditionalExpression)
.updateExpression(updateExpression)
.expressionAttributeNames(expressionAttributeNames)
.expressionAttributeValues(expressionAttributeValues).build();
try {
final long lastUpdateOfLock = LockClientUtils.INSTANCE.millisecondTime();
this.dynamoDB.updateItem(updateItemRequest);
lockItem.updateRecordVersionNumber(recordVersionNumber, lastUpdateOfLock, leaseDurationToEnsureInMilliseconds);
if (deleteData) {
lockItem.updateData(null);
} else if (options.getData().isPresent()) {
lockItem.updateData(options.getData().get());
}
} catch (final ConditionalCheckFailedException conditionalCheckFailedException) {
logger.debug("Someone else acquired the lock, so we will stop heartbeating it", conditionalCheckFailedException);
this.locks.remove(lockItem.getUniqueIdentifier());
throw new LockNotGrantedException("Someone else acquired the lock, so we will stop heartbeating it", conditionalCheckFailedException);
} catch (AwsServiceException awsServiceException) {
if (holdLockOnServiceUnavailable
&& awsServiceException.awsErrorDetails().sdkHttpResponse().statusCode() == HttpStatusCode.SERVICE_UNAVAILABLE) {
// When DynamoDB service is unavailable, other threads may get the same exception and no thread may have the lock.
// For systems which should always hold a lock on an item and it is okay for multiple threads to hold the lock,
// the lookUpTime of local state can be updated to make it believe that it still has the lock.
logger.info("DynamoDB Service Unavailable. Holding the lock.");
lockItem.updateLookUpTime(LockClientUtils.INSTANCE.millisecondTime());
} else {
throw awsServiceException;
}
}
}
}