static boolean checkIfDescendantAndAddNewLeasesForAncestors()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java [417:536]


    static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId,
            final InitialPositionInStreamExtended initialPosition, final Set<String> shardIdsOfCurrentLeases,
            final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
            final Map<String, Lease> shardIdToLeaseMapOfNewShards, final MemoizationContext memoizationContext,
            final MultiStreamArgs multiStreamArgs) {
        final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
        final Boolean previousValue = memoizationContext.isDescendant(shardId);
        if (previousValue != null) {
            return previousValue;
        }

        boolean isDescendant = false;
        final Set<String> descendantParentShardIds = new HashSet<>();

        if (shardId != null && shardIdToShardMapOfAllKinesisShards.containsKey(shardId)) {
            if (shardIdsOfCurrentLeases.contains(shardId)) {
                // This shard is a descendant of a current shard.
                isDescendant = true;
                // We don't need to add leases of its ancestors,
                // because we'd have done it when creating a lease for this shard.
            } else {

                final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
                final Set<String> parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards);
                for (String parentShardId : parentShardIds) {
                    // Check if the parent is a descendant, and include its ancestors. Or, if the parent is NOT a
                    // descendant but we should create a lease for it anyway (e.g. to include in processing from
                    // TRIM_HORIZON or AT_TIMESTAMP). If either is true, then we mark the current shard as a descendant.
                    final boolean isParentDescendant = checkIfDescendantAndAddNewLeasesForAncestors(parentShardId,
                            initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards,
                            shardIdToLeaseMapOfNewShards, memoizationContext, multiStreamArgs);
                    if (isParentDescendant || memoizationContext.shouldCreateLease(parentShardId)) {
                        isDescendant = true;
                        descendantParentShardIds.add(parentShardId);
                        log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId);
                    } else {
                        log.debug("{} : Parent shard {} is NOT a descendant.", streamIdentifier, parentShardId);
                    }
                }

                // If this is a descendant, create leases for its parent shards (if they don't exist)
                if (isDescendant) {
                    for (String parentShardId : parentShardIds) {
                        if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
                            Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);

                            /**
                             * If the lease for the parent shard does not already exist, there are two cases in which we
                             * would want to create it:
                             * - If we have already marked the parentShardId for lease creation in a prior recursive
                             *   call. This could happen if we are trying to process from TRIM_HORIZON or AT_TIMESTAMP.
                             * - If the parent shard is not a descendant but the current shard is a descendant, then
                             *   the parent shard is the oldest shard in the shard hierarchy that does not have an
                             *   ancestor in the lease table (the adjacent parent is necessarily a descendant, and
                             *   therefore covered in the lease table). So we should create a lease for the parent.
                             */
                            if (lease == null) {
                                if (memoizationContext.shouldCreateLease(parentShardId) ||
                                        !descendantParentShardIds.contains(parentShardId)) {
                                    log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId);
                                    lease = multiStreamArgs.isMultiStreamMode() ?
                                            newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId),
                                                    multiStreamArgs.streamIdentifier()) :
                                            newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
                                    shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
                                }
                            }

                            /**
                             * If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
                             * checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will
                             * add a lease just like we do for TRIM_HORIZON. However we will only return back records
                             * with server-side timestamp at or after the specified initial position timestamp.
                             *
                             * Shard structure (each level depicts a stream segment):
                             * 0 1 2 3 4   5   - shards till epoch 102
                             * \ / \ / |   |
                             *  6   7  4   5   - shards from epoch 103 - 205
                             *   \ /   |  /\
                             *    8    4 9  10 - shards from epoch 206 (open - no ending sequenceNumber)
                             *
                             * Current leases: (4, 5, 7)
                             *
                             * For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
                             * timestamp value 206. We will then create new leases for all the shards 0 and 1 (with
                             * checkpoint set AT_TIMESTAMP), even though these ancestor shards have an epoch less than
                             * 206. However as we begin processing the ancestor shards, their checkpoints would be
                             * updated to SHARD_END and their leases would then be deleted since they won't have records
                             * with server-side timestamp at/after 206. And after that we will begin processing the
                             * descendant shards with epoch at/after 206 and we will return the records that meet the
                             * timestamp requirement for these shards.
                             */
                            if (lease != null) {
                                if (descendantParentShardIds.contains(parentShardId)
                                        && !initialPosition.getInitialPositionInStream()
                                        .equals(InitialPositionInStream.AT_TIMESTAMP)) {
                                    lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
                                } else {
                                    lease.checkpoint(convertToCheckpoint(initialPosition));
                                }
                            }
                        }
                    }
                } else {
                    // This shard is not a descendant, but should still be included if the customer wants to process all
                    // records in the stream or if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a
                    // lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
                    // timestamp at or after the specified initial position timestamp.
                    if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
                            || initialPosition.getInitialPositionInStream()
                                .equals(InitialPositionInStream.AT_TIMESTAMP)) {
                        memoizationContext.setShouldCreateLease(shardId, true);
                    }
                }
            }
        }

        memoizationContext.setIsDescendant(shardId, isDescendant);
        return isDescendant;
    }