in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java [417:536]
static boolean checkIfDescendantAndAddNewLeasesForAncestors(final String shardId,
final InitialPositionInStreamExtended initialPosition, final Set<String> shardIdsOfCurrentLeases,
final Map<String, Shard> shardIdToShardMapOfAllKinesisShards,
final Map<String, Lease> shardIdToLeaseMapOfNewShards, final MemoizationContext memoizationContext,
final MultiStreamArgs multiStreamArgs) {
final String streamIdentifier = getStreamIdentifier(multiStreamArgs);
final Boolean previousValue = memoizationContext.isDescendant(shardId);
if (previousValue != null) {
return previousValue;
}
boolean isDescendant = false;
final Set<String> descendantParentShardIds = new HashSet<>();
if (shardId != null && shardIdToShardMapOfAllKinesisShards.containsKey(shardId)) {
if (shardIdsOfCurrentLeases.contains(shardId)) {
// This shard is a descendant of a current shard.
isDescendant = true;
// We don't need to add leases of its ancestors,
// because we'd have done it when creating a lease for this shard.
} else {
final Shard shard = shardIdToShardMapOfAllKinesisShards.get(shardId);
final Set<String> parentShardIds = getParentShardIds(shard, shardIdToShardMapOfAllKinesisShards);
for (String parentShardId : parentShardIds) {
// Check if the parent is a descendant, and include its ancestors. Or, if the parent is NOT a
// descendant but we should create a lease for it anyway (e.g. to include in processing from
// TRIM_HORIZON or AT_TIMESTAMP). If either is true, then we mark the current shard as a descendant.
final boolean isParentDescendant = checkIfDescendantAndAddNewLeasesForAncestors(parentShardId,
initialPosition, shardIdsOfCurrentLeases, shardIdToShardMapOfAllKinesisShards,
shardIdToLeaseMapOfNewShards, memoizationContext, multiStreamArgs);
if (isParentDescendant || memoizationContext.shouldCreateLease(parentShardId)) {
isDescendant = true;
descendantParentShardIds.add(parentShardId);
log.debug("{} : Parent shard {} is a descendant.", streamIdentifier, parentShardId);
} else {
log.debug("{} : Parent shard {} is NOT a descendant.", streamIdentifier, parentShardId);
}
}
// If this is a descendant, create leases for its parent shards (if they don't exist)
if (isDescendant) {
for (String parentShardId : parentShardIds) {
if (!shardIdsOfCurrentLeases.contains(parentShardId)) {
Lease lease = shardIdToLeaseMapOfNewShards.get(parentShardId);
/**
* If the lease for the parent shard does not already exist, there are two cases in which we
* would want to create it:
* - If we have already marked the parentShardId for lease creation in a prior recursive
* call. This could happen if we are trying to process from TRIM_HORIZON or AT_TIMESTAMP.
* - If the parent shard is not a descendant but the current shard is a descendant, then
* the parent shard is the oldest shard in the shard hierarchy that does not have an
* ancestor in the lease table (the adjacent parent is necessarily a descendant, and
* therefore covered in the lease table). So we should create a lease for the parent.
*/
if (lease == null) {
if (memoizationContext.shouldCreateLease(parentShardId) ||
!descendantParentShardIds.contains(parentShardId)) {
log.debug("{} : Need to create a lease for shardId {}", streamIdentifier, parentShardId);
lease = multiStreamArgs.isMultiStreamMode() ?
newKCLMultiStreamLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId),
multiStreamArgs.streamIdentifier()) :
newKCLLease(shardIdToShardMapOfAllKinesisShards.get(parentShardId));
shardIdToLeaseMapOfNewShards.put(parentShardId, lease);
}
}
/**
* If the shard is a descendant and the specified initial position is AT_TIMESTAMP, then the
* checkpoint should be set to AT_TIMESTAMP, else to TRIM_HORIZON. For AT_TIMESTAMP, we will
* add a lease just like we do for TRIM_HORIZON. However we will only return back records
* with server-side timestamp at or after the specified initial position timestamp.
*
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*
* Current leases: (4, 5, 7)
*
* For the above example, suppose the initial position in stream is set to AT_TIMESTAMP with
* timestamp value 206. We will then create new leases for all the shards 0 and 1 (with
* checkpoint set AT_TIMESTAMP), even though these ancestor shards have an epoch less than
* 206. However as we begin processing the ancestor shards, their checkpoints would be
* updated to SHARD_END and their leases would then be deleted since they won't have records
* with server-side timestamp at/after 206. And after that we will begin processing the
* descendant shards with epoch at/after 206 and we will return the records that meet the
* timestamp requirement for these shards.
*/
if (lease != null) {
if (descendantParentShardIds.contains(parentShardId)
&& !initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
} else {
lease.checkpoint(convertToCheckpoint(initialPosition));
}
}
}
}
} else {
// This shard is not a descendant, but should still be included if the customer wants to process all
// records in the stream or if the initial position is AT_TIMESTAMP. For AT_TIMESTAMP, we will add a
// lease just like we do for TRIM_HORIZON. However we will only return back records with server-side
// timestamp at or after the specified initial position timestamp.
if (initialPosition.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)
|| initialPosition.getInitialPositionInStream()
.equals(InitialPositionInStream.AT_TIMESTAMP)) {
memoizationContext.setShouldCreateLease(shardId, true);
}
}
}
}
memoizationContext.setIsDescendant(shardId, isDescendant);
return isDescendant;
}