in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/DynamoDBStreamsShardSyncer.java [117:164]
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
LOG.info("syncShardLeases: begin");
List<Shard> shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size());
Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
if (!ignoreUnexpectedChildShards) {
assertAllParentShardsAreClosed(inconsistentShardIds);
}
List<KinesisClientLease> currentLeases = leaseManager.listLeases();
List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
inconsistentShardIds);
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
for (KinesisClientLease lease : newLeasesToCreate) {
long startTimeMillis = System.currentTimeMillis();
boolean success = false;
try {
leaseManager.createLeaseIfNotExists(lease);
success = true;
} finally {
MetricsHelper.addSuccessAndLatency("CreateLease", startTimeMillis, success, MetricsLevel.DETAILED);
}
}
List<KinesisClientLease> trackedLeases = new ArrayList<>();
if (currentLeases != null) {
trackedLeases.addAll(currentLeases);
}
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases,
shardIdToShardMap,
shardIdToChildShardIdsMap,
trackedLeases,
leaseManager);
}
LOG.info("syncShardLeases: done");
}