in amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java [460:586]
Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
if (shouldSyncStreamsNow()) {
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER);
try {
final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
final Duration waitPeriodToDeleteOldStreams = formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams();
// Making an immutable copy
newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));
List<MultiStreamLease> leases;
// This is done to ensure that we clean up the stale streams lingering in the lease table.
if (!leasesSyncedOnAppInit && isMultiStreamMode) {
leases = fetchMultiStreamLeases();
syncStreamsFromLeaseTableOnAppInit(leases);
leasesSyncedOnAppInit = true;
}
// For new streams discovered, do a shard sync and update the currentStreamConfigMap
for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.submitShardSyncTask();
currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
streamsSynced.add(streamIdentifier);
} else {
if (log.isDebugEnabled()) {
log.debug(streamIdentifier + " is already being processed - skipping shard sync.");
}
}
}
final Consumer<StreamIdentifier> enqueueStreamLeaseDeletionOperation = streamIdentifier -> {
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now());
}
};
if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
// Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
// It is assumed that all the workers will always have the latest and consistent snapshot of streams
// from the multiStreamTracker.
//
// The following streams transition state among two workers are NOT considered safe, where Worker 2, on
// initialization learn about D from lease table and delete the leases for D, as it is not available
// in its latest MultiStreamTracker.
// Worker 1 : A,B,C -> A,B,C,D (latest)
// Worker 2 : BOOTS_UP -> A,B,C (stale)
//
// The following streams transition state among two workers are NOT considered safe, where Worker 2 might
// end up deleting the leases for A and D and lose progress made so far.
// Worker 1 : A,B,C -> A,B,C,D (latest)
// Worker 2 : A,B,C -> B,C (stale/partial)
//
// In order to give workers with stale stream info, sufficient time to learn about the new streams
// before attempting to delete it, we will be deferring the leases deletion based on the
// defer time period.
currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier));
} else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
} else {
// Remove the old/stale streams identified through the new and existing streams list, without
// cleaning up their leases. Disabling deprecated shard sync + lease cleanup through a flag.
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
while (currentSetOfStreamsIter.hasNext()) {
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
if (!newStreamConfigMap.containsKey(streamIdentifier)) {
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
log.info(
"Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", streamIdentifier);
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
currentStreamConfigMap.get(streamIdentifier));
shardSyncTaskManager.submitShardSyncTask();
} else {
log.info(
"Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases,"
+ " as part of this workflow", streamIdentifier);
}
currentSetOfStreamsIter.remove();
streamsSynced.add(streamIdentifier);
}
}
}
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
// the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
final Set<StreamIdentifier> deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
streamsSynced.addAll(deletedStreamsLeases);
// Purge the active streams from stale streams list.
final Set<StreamIdentifier> staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true);
removeStreamsFromStaleStreamsList(staleStreamIdsToBeRevived);
if (!staleStreamDeletionMap.isEmpty()) {
log.warn(
"Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ",
staleStreamDeletionMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> entry.getValue().plus(waitPeriodToDeleteOldStreams))));
}
streamSyncWatch.reset().start();
MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(),
MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY);
} finally {
MetricsUtil.endScope(metricsScope);
}
}
return streamsSynced;
}