in samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java [91:156]
public TaskRestoreManager getRestoreManager(JobContext jobContext,
ContainerContext containerContext,
TaskModel taskModel,
ExecutorService restoreExecutor,
MetricsRegistry metricsRegistry,
Set<String> storesToRestore,
Config config,
Clock clock,
File loggedStoreBaseDir,
File nonLoggedStoreBaseDir,
KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
.flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
.map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
.collect(Collectors.toSet());
// filter out standby store-ssp pairs
Map<String, SystemStream> filteredStoreChangelogs =
filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());
if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
return new TransactionalStateTaskRestoreManager(
storesToRestore,
jobContext,
containerContext,
taskModel,
restoreExecutor,
filteredStoreChangelogs,
kafkaChangelogRestoreParams.getInMemoryStores(),
kafkaChangelogRestoreParams.getStorageEngineFactories(),
kafkaChangelogRestoreParams.getSerdes(),
systemAdmins,
kafkaChangelogRestoreParams.getStoreConsumers(),
metricsRegistry,
kafkaChangelogRestoreParams.getCollector(),
getSspCache(systemAdmins, clock, changelogSSPs),
loggedStoreBaseDir,
nonLoggedStoreBaseDir,
config,
clock
);
} else {
return new NonTransactionalStateTaskRestoreManager(
storesToRestore,
jobContext,
containerContext,
taskModel,
restoreExecutor,
filteredStoreChangelogs,
kafkaChangelogRestoreParams.getInMemoryStores(),
kafkaChangelogRestoreParams.getStorageEngineFactories(),
kafkaChangelogRestoreParams.getSerdes(),
systemAdmins,
getStreamCache(systemAdmins, clock),
kafkaChangelogRestoreParams.getStoreConsumers(),
metricsRegistry,
kafkaChangelogRestoreParams.getCollector(),
jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
loggedStoreBaseDir,
nonLoggedStoreBaseDir,
config,
clock
);
}
}