in samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java [258:372]
private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
LOG.info("Store Restore started");
Set<TaskName> activeTasks = ContainerStorageManagerUtil.getTasks(containerModel, TaskMode.Active).keySet();
// Find all non-side input stores
Set<String> nonSideInputStoreNames = storageEngineFactories.keySet()
.stream()
.filter(storeName -> !sideInputStoreNames.contains(storeName))
.collect(Collectors.toSet());
// Obtain the checkpoints for each task
Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers = new HashMap<>();
Map<TaskName, Checkpoint> taskCheckpoints = new HashMap<>();
Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames = new HashMap<>();
containerModel.getTasks().forEach((taskName, taskModel) -> {
Checkpoint taskCheckpoint = null;
if (checkpointManager != null && activeTasks.contains(taskName)) {
// only pass in checkpoints for active tasks
taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
}
// Only insert non-null checkpoints
if (taskCheckpoint != null) {
taskCheckpoints.put(taskName, taskCheckpoint);
}
Map<String, Set<String>> backendFactoryToStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config));
Map<String, Set<String>> backendFactoryToSideInputStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
sideInputStoreNames, taskCheckpoint, new StorageConfig(config));
// include side input stores for (initial bulk) restore if backed up using blob store state backend
String blobStoreStateBackendFactory = BlobStoreStateBackendFactory.class.getName();
if (backendFactoryToSideInputStoreNames.containsKey(blobStoreStateBackendFactory)) {
Set<String> sideInputStoreNames = backendFactoryToSideInputStoreNames.get(blobStoreStateBackendFactory);
if (backendFactoryToStoreNames.containsKey(blobStoreStateBackendFactory)) {
backendFactoryToStoreNames.get(blobStoreStateBackendFactory).addAll(sideInputStoreNames);
} else {
backendFactoryToStoreNames.put(blobStoreStateBackendFactory, sideInputStoreNames);
}
}
Map<String, TaskRestoreManager> taskStoreRestoreManagers =
ContainerStorageManagerUtil.createTaskRestoreManagers(
taskName, backendFactoryToStoreNames, restoreStateBackendFactories,
storageEngineFactories, storeConsumers,
inMemoryStores, systemAdmins, restoreExecutor,
taskModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
});
// if we have received no input checkpoints, it can only be due to two reasons:
// a) Samza job is new, so it has no previous checkpoints.
// b) The checkpoints were cleared.
// We should be able to safely clear local logged stores in either case
if (taskCheckpoints.isEmpty()) {
LOG.info("No checkpoints read. Attempting to clear logged stores.");
clearLoggedStores(loggedStoreBaseDirectory);
}
// Init all taskRestores and if successful, restores all the task stores concurrently
LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
CompletableFuture<Map<TaskName, Checkpoint>> initRestoreAndNewCheckpointFuture =
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(taskRestoreManagers, samzaContainerMetrics,
checkpointManager, jobContext, containerModel, taskCheckpoints, taskBackendFactoryToStoreNames, config,
restoreExecutor, taskInstanceMetrics, loggedStoreBaseDirectory, storeConsumers);
// Update the task checkpoints map, if it was updated during the restore. Throw an exception if the restore or
// creating a new checkpoint (in case of BlobStoreBackendFactory) failed.
try {
Map<TaskName, Checkpoint> newTaskCheckpoints = initRestoreAndNewCheckpointFuture.get();
taskCheckpoints.putAll(newTaskCheckpoints);
LOG.debug("Post init and restore checkpoints is: {}. NewTaskCheckpoints are: {}", taskCheckpoints, newTaskCheckpoints);
} catch (InterruptedException e) {
LOG.warn("Received an interrupt during store restoration. Interrupting the restore executor to exit "
+ "prematurely without restoring full state.");
restoreExecutor.shutdownNow();
throw e;
} catch (Exception e) {
LOG.error("Exception when restoring state.", e);
throw new SamzaException("Exception when restoring state.", e);
}
// Stop each store consumer once
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
// Now create persistent non-side-input stores in read-write mode, leave non-persistent and side-input stores as-is
Set<String> inMemoryStoreNames =
ContainerStorageManagerUtil.getInMemoryStoreNames(this.storageEngineFactories, this.config);
Set<String> storesToCreate = nonSideInputStoreNames.stream()
.filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
this.taskStores = ContainerStorageManagerUtil.createTaskStores(
storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
this.activeTaskChangelogSystemStreams, this.containerModel, this.jobContext, this.containerContext,
this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);
// Add in memory stores
this.inMemoryStores.forEach((taskName, stores) -> {
if (!this.taskStores.containsKey(taskName)) {
taskStores.put(taskName, new HashMap<>());
}
taskStores.get(taskName).putAll(stores);
});
LOG.info("Store Restore complete");
return taskCheckpoints;
}