in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java [707:748]
synchronized void cleanupLeaseForClosedShard(String closedShardId,
Set<String> childShardIds,
Map<String, KinesisClientLease> trackedLeases,
ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List<KinesisClientLease> childShardLeases = new ArrayList<>();
for (String childShardId : childShardIds) {
KinesisClientLease childLease = trackedLeases.get(childShardId);
if (childLease != null) {
childShardLeases.add(childLease);
}
}
if ((leaseForClosedShard != null)
&& (leaseForClosedShard.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END))
&& (childShardLeases.size() == childShardIds.size())) {
boolean okayToDelete = true;
for (KinesisClientLease lease : childShardLeases) {
if (!lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
okayToDelete = false; // if any child is still being processed, don't delete lease for parent
break;
}
}
try {
if (Instant.now().isBefore(getShardCreationTime(closedShardId).plus(MIN_LEASE_RETENTION))) {
okayToDelete = false; // if parent was created within lease retention period, don't delete lease for parent
}
} catch (RuntimeException e) {
LOG.info("Could not extract creation time from ShardId [" + closedShardId +"]");
LOG.debug(e);
}
if (okayToDelete) {
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
+ " as it is eligible for cleanup - its child shard is check-pointed at SHARD_END.");
leaseManager.deleteLease(leaseForClosedShard);
}
}
}