Set checkAndSyncStreamShardsAndLeases()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java [460:586]


    Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
            throws DependencyException, ProvisionedThroughputException, InvalidStateException {
        final Set<StreamIdentifier> streamsSynced = new HashSet<>();

        if (shouldSyncStreamsNow()) {
            final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, MULTI_STREAM_TRACKER);

            try {

                final Map<StreamIdentifier, StreamConfig> newStreamConfigMap = new HashMap<>();
                final Duration waitPeriodToDeleteOldStreams = formerStreamsLeasesDeletionStrategy.waitPeriodToDeleteFormerStreams();
                // Making an immutable copy
                newStreamConfigMap.putAll(multiStreamTracker.streamConfigList().stream()
                        .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc)));

                List<MultiStreamLease> leases;

                // This is done to ensure that we clean up the stale streams lingering in the lease table.
                if (!leasesSyncedOnAppInit && isMultiStreamMode) {
                    leases = fetchMultiStreamLeases();
                    syncStreamsFromLeaseTableOnAppInit(leases);
                    leasesSyncedOnAppInit = true;
                }

                // For new streams discovered, do a shard sync and update the currentStreamConfigMap
                for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) {
                    if (!currentStreamConfigMap.containsKey(streamIdentifier)) {
                        log.info("Found new stream to process: " + streamIdentifier + ". Syncing shards of that stream.");
                        ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(newStreamConfigMap.get(streamIdentifier));
                        shardSyncTaskManager.submitShardSyncTask();
                        currentStreamConfigMap.put(streamIdentifier, newStreamConfigMap.get(streamIdentifier));
                        streamsSynced.add(streamIdentifier);
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug(streamIdentifier + " is already being processed - skipping shard sync.");
                        }
                    }
                }

                final Consumer<StreamIdentifier> enqueueStreamLeaseDeletionOperation = streamIdentifier -> {
                    if (!newStreamConfigMap.containsKey(streamIdentifier)) {
                        staleStreamDeletionMap.putIfAbsent(streamIdentifier, Instant.now());
                    }
                };

                if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION) {
                    // Now, we are identifying the stale/old streams and enqueuing it for deferred deletion.
                    // It is assumed that all the workers will always have the latest and consistent snapshot of streams
                    // from the multiStreamTracker.
                    //
                    // The following streams transition state among two workers are NOT considered safe, where Worker 2, on
                    // initialization learn about D from lease table and delete the leases for D, as it is not available
                    // in its latest MultiStreamTracker.
                    // Worker 1 : A,B,C -> A,B,C,D (latest)
                    // Worker 2 : BOOTS_UP -> A,B,C (stale)
                    //
                    // The following streams transition state among two workers are NOT considered safe, where Worker 2 might
                    // end up deleting the leases for A and D and lose progress made so far.
                    // Worker 1 : A,B,C -> A,B,C,D (latest)
                    // Worker 2 : A,B,C -> B,C (stale/partial)
                    //
                    // In order to give workers with stale stream info, sufficient time to learn about the new streams
                    // before attempting to delete it, we will be deferring the leases deletion based on the
                    // defer time period.

                    currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier));

                } else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
                    Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
                            streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
                } else {
                    // Remove the old/stale streams identified through the new and existing streams list, without
                    // cleaning up their leases. Disabling deprecated shard sync + lease cleanup through a flag.
                    Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
                    while (currentSetOfStreamsIter.hasNext()) {
                        StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
                        if (!newStreamConfigMap.containsKey(streamIdentifier)) {
                            if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
                                log.info(
                                        "Found old/deleted stream : {}. Triggering shard sync. Removing from tracked active streams.", streamIdentifier);
                                ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(
                                        currentStreamConfigMap.get(streamIdentifier));
                                shardSyncTaskManager.submitShardSyncTask();
                            } else {
                                log.info(
                                        "Found old/deleted stream : {}. Removing from tracked active streams, but not cleaning up leases,"
                                                + " as part of this workflow", streamIdentifier);
                            }
                            currentSetOfStreamsIter.remove();
                            streamsSynced.add(streamIdentifier);
                        }
                    }
                }

                // Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
                // StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
                // the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
                final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
                        .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
                final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
                        Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
                final Set<StreamIdentifier> deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
                streamsSynced.addAll(deletedStreamsLeases);

                // Purge the active streams from stale streams list.
                final Set<StreamIdentifier> staleStreamIdsToBeRevived = staleStreamIdDeletionDecisionMap.get(true);
                removeStreamsFromStaleStreamsList(staleStreamIdsToBeRevived);

                if (!staleStreamDeletionMap.isEmpty()) {
                    log.warn(
                            "Streams enqueued for deletion for lease table cleanup along with their scheduled time for deletion: {} ",
                            staleStreamDeletionMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
                                    entry -> entry.getValue().plus(waitPeriodToDeleteOldStreams))));
                }

                streamSyncWatch.reset().start();

                MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY);
                MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(),
                        MetricsLevel.SUMMARY);
                MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY);
            } finally {
                MetricsUtil.endScope(metricsScope);
            }
        }
        return streamsSynced;
    }