in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java [657:691]
private synchronized void cleanupLeasesOfFinishedShards(Collection<KinesisClientLease> currentLeases,
Map<String, Shard> shardIdToShardMap,
Map<String, Set<String>> shardIdToChildShardIdsMap,
List<KinesisClientLease> trackedLeases,
ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
LOG.info("cleanupLeasesOfFinishedShards: begin");
Set<String> shardIdsOfClosedShards = new HashSet<>();
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
for (KinesisClientLease lease : currentLeases) {
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
shardIdsOfClosedShards.add(lease.getLeaseKey());
leasesOfClosedShards.add(lease);
}
}
if (!leasesOfClosedShards.isEmpty()) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap,
shardIdToChildShardIdsMap,
shardIdsOfClosedShards);
Comparator<? super KinesisClientLease> startingSequenceNumberComparator
= new StartingSequenceNumberAndShardIdBasedComparator(shardIdToShardMap);
Collections.sort(leasesOfClosedShards, startingSequenceNumberComparator);
Map<String, KinesisClientLease> trackedLeaseMap = constructShardIdToKCLLeaseMap(trackedLeases);
for (KinesisClientLease leaseOfClosedShard : leasesOfClosedShards) {
String closedShardId = leaseOfClosedShard.getLeaseKey();
Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
if ((closedShardId != null) && (childShardIds != null) && (!childShardIds.isEmpty())) {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
}
}
}
LOG.info("cleanupLeasesOfFinishedShards: done");
}