in src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java [407:569]
public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGrantedException, InterruptedException {
Objects.requireNonNull(options, "Cannot acquire lock when options is null");
Objects.requireNonNull(options.getPartitionKey(), "Cannot acquire lock when key is null");
final String key = options.getPartitionKey();
final Optional<String> sortKey = options.getSortKey();
if (options.getReentrant() && hasLock(key, sortKey)) { // Call hasLock() to avoid making a db call when the client does not own the lock.
Optional<LockItem> lock = getLock(key, sortKey);
if (lock.isPresent() && !lock.get().isExpired()) {
return lock.get();
}
}
if (options.getAdditionalAttributes().containsKey(this.partitionKeyName) || options.getAdditionalAttributes().containsKey(OWNER_NAME) || options
.getAdditionalAttributes().containsKey(LEASE_DURATION) || options.getAdditionalAttributes().containsKey(RECORD_VERSION_NUMBER) || options
.getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get())) {
throw new IllegalArgumentException(String
.format("Additional attribute cannot be one of the following types: " + "%s, %s, %s, %s, %s", this.partitionKeyName, OWNER_NAME, LEASE_DURATION,
RECORD_VERSION_NUMBER, DATA));
}
long millisecondsToWait = DEFAULT_BUFFER_MS;
if (options.getAdditionalTimeToWaitForLock() != null) {
Objects.requireNonNull(options.getTimeUnit(), "timeUnit must not be null if additionalTimeToWaitForLock is non-null");
millisecondsToWait = options.getTimeUnit().toMillis(options.getAdditionalTimeToWaitForLock());
}
long refreshPeriodInMilliseconds = DEFAULT_BUFFER_MS;
if (options.getRefreshPeriod() != null) {
Objects.requireNonNull(options.getTimeUnit(), "timeUnit must not be null if refreshPeriod is non-null");
refreshPeriodInMilliseconds = options.getTimeUnit().toMillis(options.getRefreshPeriod());
}
final boolean deleteLockOnRelease = options.getDeleteLockOnRelease();
final boolean replaceData = options.getReplaceData();
final Optional<SessionMonitor> sessionMonitor = options.getSessionMonitor();
if (sessionMonitor.isPresent()) {
sessionMonitorArgsValidate(sessionMonitor.get().getSafeTimeMillis(), this.heartbeatPeriodInMilliseconds, this.leaseDurationInMilliseconds);
}
final long currentTimeMillis = LockClientUtils.INSTANCE.millisecondTime();
/*
* This is the lock we are trying to acquire. If it already exists, then we can try to steal it if it does not get updated
* after its LEASE_DURATION expires.
*/
LockItem lockTryingToBeAcquired = null;
boolean alreadySleptOnceForOneLeasePeriod = false;
final GetLockOptions getLockOptions = new GetLockOptions.GetLockOptionsBuilder(key)
.withSortKey(sortKey.orElse(null))
.withDeleteLockOnRelease(deleteLockOnRelease)
.build();
while (true) {
try {
try {
logger.trace("Call GetItem to see if the lock for " + partitionKeyName + " =" + key + ", " + this.sortKeyName + "=" + sortKey + " exists in the table");
final Optional<LockItem> existingLock = this.getLockFromDynamoDB(getLockOptions);
if (options.getAcquireOnlyIfLockAlreadyExists() && !existingLock.isPresent()) {
throw new LockNotGrantedException("Lock does not exist.");
}
if (options.shouldSkipBlockingWait() && existingLock.isPresent() && !existingLock.get().isExpired()) {
/*
* The lock is being held by some one and is still not expired. And the caller explicitly said not to perform a blocking wait;
* We will throw back a lock not grant exception, so that the caller can retry if needed.
*/
throw new LockCurrentlyUnavailableException("The lock being requested is being held by another client.");
}
Optional<ByteBuffer> newLockData = Optional.empty();
if (replaceData) {
newLockData = options.getData();
} else if (existingLock.isPresent()) {
newLockData = existingLock.get().getData();
}
if (!newLockData.isPresent()) {
newLockData = options.getData(); // If there is no existing data, we write the input data to the lock.
}
final Map<String, AttributeValue> item = new HashMap<>();
item.putAll(options.getAdditionalAttributes());
item.put(this.partitionKeyName, AttributeValue.builder().s(key).build());
item.put(OWNER_NAME, AttributeValue.builder().s(this.ownerName).build());
item.put(LEASE_DURATION, AttributeValue.builder().s(String.valueOf(this.leaseDurationInMilliseconds)).build());
final String recordVersionNumber = this.generateRecordVersionNumber();
item.put(RECORD_VERSION_NUMBER, AttributeValue.builder().s(String.valueOf(recordVersionNumber)).build());
sortKeyName.ifPresent(sortKeyName -> item.put(sortKeyName, AttributeValue.builder().s(sortKey.get()).build()));
newLockData.ifPresent(byteBuffer -> item.put(DATA, AttributeValue.builder().b(SdkBytes.fromByteBuffer(byteBuffer)).build()));
//if the existing lock does not exist or exists and is released
if (!existingLock.isPresent() && !options.getAcquireOnlyIfLockAlreadyExists()) {
return upsertAndMonitorNewLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData,
item, recordVersionNumber);
} else if (existingLock.isPresent() && existingLock.get().isReleased()) {
return upsertAndMonitorReleasedLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock,
newLockData, item, recordVersionNumber);
}
// we know that we didnt enter the if block above because it returns at the end.
// we also know that the existingLock.isPresent() is true
if (lockTryingToBeAcquired == null) {
//this branch of logic only happens once, in the first iteration of the while loop
//lockTryingToBeAcquired only ever gets set to non-null values after this point.
//so it is impossible to get in this
/*
* Someone else has the lock, and they have the lock for LEASE_DURATION time. At this point, we need
* to wait at least LEASE_DURATION milliseconds before we can try to acquire the lock.
*/
lockTryingToBeAcquired = existingLock.get();
if (!alreadySleptOnceForOneLeasePeriod) {
alreadySleptOnceForOneLeasePeriod = true;
millisecondsToWait += existingLock.get().getLeaseDuration();
}
} else {
if (lockTryingToBeAcquired.getRecordVersionNumber().equals(existingLock.get().getRecordVersionNumber())) {
/* If the version numbers match, then we can acquire the lock, assuming it has already expired */
if (lockTryingToBeAcquired.isExpired()) {
return upsertAndMonitorExpiredLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock, newLockData, item,
recordVersionNumber);
}
} else {
/*
* If the version number changed since we last queried the lock, then we need to update
* lockTryingToBeAcquired as the lock has been refreshed since we last checked
*/
lockTryingToBeAcquired = existingLock.get();
}
}
} catch (final ConditionalCheckFailedException conditionalCheckFailedException) {
/* Someone else acquired the lock while we tried to do so, so we throw an exception */
logger.debug("Someone else acquired the lock", conditionalCheckFailedException);
throw new LockNotGrantedException("Could not acquire lock because someone else acquired it: ", conditionalCheckFailedException);
} catch (ProvisionedThroughputExceededException provisionedThroughputExceededException) {
/* Request exceeded maximum allowed provisioned throughput for the table
* or for one or more global secondary indexes.
*/
logger.debug("Maximum allowed provisioned throughput for the table exceeded", provisionedThroughputExceededException);
throw new LockNotGrantedException("Could not acquire lock because provisioned throughput for the table exceeded", provisionedThroughputExceededException);
} catch (final SdkClientException sdkClientException) {
/* This indicates that we were unable to successfully connect and make a service call to DDB. Often
* indicative of a network failure, such as a socket timeout. We retry if still within the time we
* can wait to acquire the lock.
*/
logger.warn("Could not acquire lock because of a client side failure in talking to DDB", sdkClientException);
}
} catch (final LockNotGrantedException x) {
if (LockClientUtils.INSTANCE.millisecondTime() - currentTimeMillis > millisecondsToWait) {
logger.debug("This client waited more than millisecondsToWait=" + millisecondsToWait
+ " ms since the beginning of this acquire call.", x);
throw x;
}
}
if (LockClientUtils.INSTANCE.millisecondTime() - currentTimeMillis > millisecondsToWait) {
throw new LockNotGrantedException("Didn't acquire lock after sleeping for " + (LockClientUtils.INSTANCE.millisecondTime() - currentTimeMillis) + " milliseconds");
}
logger.trace("Sleeping for a refresh period of " + refreshPeriodInMilliseconds + " ms");
Thread.sleep(refreshPeriodInMilliseconds);
}
}