in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java [600:638]
private void cleanupGarbageLeases(List<Shard> shards,
List<KinesisClientLease> trackedLeases,
IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager)
throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
LOG.info("cleanupGarbageLeases: begin");
Set<String> kinesisShards = new HashSet<>();
for (Shard shard : shards) {
kinesisShards.add(shard.getShardId());
}
// Check if there are leases for non-existent shards
List<KinesisClientLease> garbageLeases = new ArrayList<>();
for (KinesisClientLease lease : trackedLeases) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) {
garbageLeases.add(lease);
}
}
if (!garbageLeases.isEmpty()) {
LOG.info("Found " + garbageLeases.size()
+ " candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards");
List<Shard> currentShardList = getShardList(kinesisProxy);
Set<String> currentKinesisShardIds = new HashSet<>();
for (Shard shard : currentShardList) {
currentKinesisShardIds.add(shard.getShardId());
}
for (KinesisClientLease lease : garbageLeases) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) {
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
+ " as it is not present in Kinesis stream.");
leaseManager.deleteLease(lease);
}
}
}
LOG.info("cleanupGarbageLeases: done");
}