List determineNewLeasesToCreate()

in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java [376:453]


    List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
        List<KinesisClientLease> currentLeases,
        InitialPositionInStreamExtended initialPosition,
        Set<String> inconsistentShardIds) {
        LOG.info("determineNewLeasesToCreate: begin");
        Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<>();
        Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);

        Set<String> shardIdsOfCurrentLeases = new HashSet<String>();
        for (KinesisClientLease lease : currentLeases) {
            shardIdsOfCurrentLeases.add(lease.getLeaseKey());
            LOG.debug("Existing lease: " + lease);
        }

        List<Shard> openShards = getOpenShards(shards);
        Map<String, Boolean> memoizationContext = new HashMap<>();

        // Iterate over the open shards and find those that don't have any lease entries.
        for (Shard shard : openShards) {
            String shardId = shard.getShardId();
            LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
            if (shardIdsOfCurrentLeases.contains(shardId)) {
                LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
            } else if (inconsistentShardIds.contains(shardId)) {
                LOG.info("shardId " + shardId + " is an inconsistent child.  Not creating a lease");
            } else {
                LOG.debug("Need to create a lease for shardId " + shardId);
                KinesisClientLease newLease = newKCLLease(shard);
                boolean isDescendant =
                    checkIfDescendantAndAddNewLeasesForAncestors(shardId,
                        initialPosition,
                        shardIdsOfCurrentLeases,
                        shardIdToShardMapOfAllKinesisShards,
                        shardIdToNewLeaseMap,
                        memoizationContext);

                /**
                 * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
                 * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will add a
                 * lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
                 * timestamp at or after the specified initial position timestamp.
                 *
                 * Shard structure (each level depicts a stream segment):
                 * 0 1 2 3 4   5   - shards till epoch 102
                 * \ / \ / |   |
                 *  6   7  4   5   - shards from epoch 103 - 205
                 *   \ /   |  /\
                 *    8    4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
                 *
                 * Current leases: empty set
                 *
                 * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
                 * timestamp value 206. We will then create new leases for all the shards (with checkpoint set to
                 * AT_TIMESTAMP), including the ancestor shards with epoch less than 206. However as we begin
                 * processing the ancestor shards, their checkpoints would be updated to SHARD_END and their leases
                 * would then be deleted since they won't have records with server-side timestamp at/after 206. And
                 * after that we will begin processing the descendant shards with epoch at/after 206 and we will
                 * return the records that meet the timestamp requirement for these shards.
                 */
                if (isDescendant && !initialPosition.getInitialPositionInStream()
                    .equals(InitialPositionInStream.AT_TIMESTAMP)) {
                    newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
                } else {
                    newLease.setCheckpoint(convertToCheckpoint(initialPosition));
                }
                LOG.debug("Set checkpoint of " + newLease.getLeaseKey() + " to " + newLease.getCheckpoint());
                shardIdToNewLeaseMap.put(shardId, newLease);
            }
        }

        List<KinesisClientLease> newLeasesToCreate = new ArrayList<>();
        newLeasesToCreate.addAll(shardIdToNewLeaseMap.values());
        Comparator<? super KinesisClientLease> startingSequenceNumberComparator =
            new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMapOfAllKinesisShards);
        Collections.sort(newLeasesToCreate, startingSequenceNumberComparator);
        LOG.info("determineNewLeasesToCreate: done");
        return newLeasesToCreate;
    }