public LockItem acquireLock()

in src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java [407:569]


    public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGrantedException, InterruptedException {
        Objects.requireNonNull(options, "Cannot acquire lock when options is null");
        Objects.requireNonNull(options.getPartitionKey(), "Cannot acquire lock when key is null");

        final String key = options.getPartitionKey();
        final Optional<String> sortKey = options.getSortKey();

        if (options.getReentrant() && hasLock(key, sortKey)) { // Call hasLock() to avoid making a db call when the client does not own the lock.
            Optional<LockItem> lock = getLock(key, sortKey);
            if (lock.isPresent() && !lock.get().isExpired()) {
                return lock.get();
            }
        }

        if (options.getAdditionalAttributes().containsKey(this.partitionKeyName) || options.getAdditionalAttributes().containsKey(OWNER_NAME) || options
            .getAdditionalAttributes().containsKey(LEASE_DURATION) || options.getAdditionalAttributes().containsKey(RECORD_VERSION_NUMBER) || options
            .getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get())) {
            throw new IllegalArgumentException(String
                .format("Additional attribute cannot be one of the following types: " + "%s, %s, %s, %s, %s", this.partitionKeyName, OWNER_NAME, LEASE_DURATION,
                    RECORD_VERSION_NUMBER, DATA));
        }

        long millisecondsToWait = DEFAULT_BUFFER_MS;
        if (options.getAdditionalTimeToWaitForLock() != null) {
            Objects.requireNonNull(options.getTimeUnit(), "timeUnit must not be null if additionalTimeToWaitForLock is non-null");
            millisecondsToWait = options.getTimeUnit().toMillis(options.getAdditionalTimeToWaitForLock());
        }

        long refreshPeriodInMilliseconds = DEFAULT_BUFFER_MS;
        if (options.getRefreshPeriod() != null) {
            Objects.requireNonNull(options.getTimeUnit(), "timeUnit must not be null if refreshPeriod is non-null");
            refreshPeriodInMilliseconds = options.getTimeUnit().toMillis(options.getRefreshPeriod());
        }

        final boolean deleteLockOnRelease = options.getDeleteLockOnRelease();
        final boolean replaceData = options.getReplaceData();

        final Optional<SessionMonitor> sessionMonitor = options.getSessionMonitor();
        if (sessionMonitor.isPresent()) {
            sessionMonitorArgsValidate(sessionMonitor.get().getSafeTimeMillis(), this.heartbeatPeriodInMilliseconds, this.leaseDurationInMilliseconds);
        }
        final long currentTimeMillis = LockClientUtils.INSTANCE.millisecondTime();

        /*
         * This is the lock we are trying to acquire. If it already exists, then we can try to steal it if it does not get updated
         * after its LEASE_DURATION expires.
         */
        LockItem lockTryingToBeAcquired = null;
        boolean alreadySleptOnceForOneLeasePeriod = false;

        final GetLockOptions getLockOptions = new GetLockOptions.GetLockOptionsBuilder(key)
                .withSortKey(sortKey.orElse(null))
                .withDeleteLockOnRelease(deleteLockOnRelease)
                .build();

        while (true) {
            try {
                try {
                    logger.trace("Call GetItem to see if the lock for " + partitionKeyName + " =" + key + ", " + this.sortKeyName + "=" + sortKey + " exists in the table");
                    final Optional<LockItem> existingLock = this.getLockFromDynamoDB(getLockOptions);

                    if (options.getAcquireOnlyIfLockAlreadyExists() && !existingLock.isPresent()) {
                        throw new LockNotGrantedException("Lock does not exist.");
                    }

                    if (options.shouldSkipBlockingWait() && existingLock.isPresent() && !existingLock.get().isExpired()) {
                        /*
                         * The lock is being held by some one and is still not expired. And the caller explicitly said not to perform a blocking wait;
                         * We will throw back a lock not grant exception, so that the caller can retry if needed.
                         */
                        throw new LockCurrentlyUnavailableException("The lock being requested is being held by another client.");
                    }

                    Optional<ByteBuffer> newLockData = Optional.empty();
                    if (replaceData) {
                        newLockData = options.getData();
                    } else if (existingLock.isPresent()) {
                        newLockData = existingLock.get().getData();
                    }

                    if (!newLockData.isPresent()) {
                        newLockData = options.getData(); // If there is no existing data, we write the input data to the lock.
                    }

                    final Map<String, AttributeValue> item = new HashMap<>();
                    item.putAll(options.getAdditionalAttributes());
                    item.put(this.partitionKeyName, AttributeValue.builder().s(key).build());
                    item.put(OWNER_NAME, AttributeValue.builder().s(this.ownerName).build());
                    item.put(LEASE_DURATION, AttributeValue.builder().s(String.valueOf(this.leaseDurationInMilliseconds)).build());
                    final String recordVersionNumber = this.generateRecordVersionNumber();
                    item.put(RECORD_VERSION_NUMBER, AttributeValue.builder().s(String.valueOf(recordVersionNumber)).build());
                    sortKeyName.ifPresent(sortKeyName -> item.put(sortKeyName, AttributeValue.builder().s(sortKey.get()).build()));
                    newLockData.ifPresent(byteBuffer -> item.put(DATA, AttributeValue.builder().b(SdkBytes.fromByteBuffer(byteBuffer)).build()));

                    //if the existing lock does not exist or exists and is released
                    if (!existingLock.isPresent() && !options.getAcquireOnlyIfLockAlreadyExists()) {
                        return upsertAndMonitorNewLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData,
                            item, recordVersionNumber);
                    } else if (existingLock.isPresent() && existingLock.get().isReleased()) {
                        return upsertAndMonitorReleasedLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock,
                            newLockData, item, recordVersionNumber);
                    }
                    // we know that we didnt enter the if block above because it returns at the end.
                    // we also know that the existingLock.isPresent() is true
                    if (lockTryingToBeAcquired == null) {
                        //this branch of logic only happens once, in the first iteration of the while loop
                        //lockTryingToBeAcquired only ever gets set to non-null values after this point.
                        //so it is impossible to get in this
                        /*
                         * Someone else has the lock, and they have the lock for LEASE_DURATION time. At this point, we need
                         * to wait at least LEASE_DURATION milliseconds before we can try to acquire the lock.
                         */
                        lockTryingToBeAcquired = existingLock.get();
                        if (!alreadySleptOnceForOneLeasePeriod) {
                            alreadySleptOnceForOneLeasePeriod = true;
                            millisecondsToWait += existingLock.get().getLeaseDuration();
                        }
                    } else {
                        if (lockTryingToBeAcquired.getRecordVersionNumber().equals(existingLock.get().getRecordVersionNumber())) {
                            /* If the version numbers match, then we can acquire the lock, assuming it has already expired */
                            if (lockTryingToBeAcquired.isExpired()) {
                                return upsertAndMonitorExpiredLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock, newLockData, item,
                                    recordVersionNumber);
                            }
                        } else {
                            /*
                             * If the version number changed since we last queried the lock, then we need to update
                             * lockTryingToBeAcquired as the lock has been refreshed since we last checked
                             */
                            lockTryingToBeAcquired = existingLock.get();
                        }
                    }
                } catch (final ConditionalCheckFailedException conditionalCheckFailedException) {
                    /* Someone else acquired the lock while we tried to do so, so we throw an exception */
                    logger.debug("Someone else acquired the lock", conditionalCheckFailedException);
                    throw new LockNotGrantedException("Could not acquire lock because someone else acquired it: ", conditionalCheckFailedException);
                } catch (ProvisionedThroughputExceededException provisionedThroughputExceededException) {
                    /* Request exceeded maximum allowed provisioned throughput for the table
                     * or for one or more global secondary indexes.
                     */
                    logger.debug("Maximum allowed provisioned throughput for the table exceeded", provisionedThroughputExceededException);
                    throw new LockNotGrantedException("Could not acquire lock because provisioned throughput for the table exceeded", provisionedThroughputExceededException);
                } catch (final SdkClientException sdkClientException) {
                    /* This indicates that we were unable to successfully connect and make a service call to DDB. Often
                     * indicative of a network failure, such as a socket timeout. We retry if still within the time we
                     * can wait to acquire the lock.
                     */
                    logger.warn("Could not acquire lock because of a client side failure in talking to DDB", sdkClientException);
                }
            } catch (final LockNotGrantedException x) {
                if (LockClientUtils.INSTANCE.millisecondTime() - currentTimeMillis > millisecondsToWait) {
                    logger.debug("This client waited more than millisecondsToWait=" + millisecondsToWait
                        + " ms since the beginning of this acquire call.", x);
                    throw x;
                }
            }
            if (LockClientUtils.INSTANCE.millisecondTime() - currentTimeMillis > millisecondsToWait) {
                throw new LockNotGrantedException("Didn't acquire lock after sleeping for " + (LockClientUtils.INSTANCE.millisecondTime() - currentTimeMillis) + " milliseconds");
            }
            logger.trace("Sleeping for a refresh period of " + refreshPeriodInMilliseconds + " ms");
            Thread.sleep(refreshPeriodInMilliseconds);
        }
    }