in flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackendBuilder.java [336:513]
public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
RocksDBWriteBatchWrapper writeBatchWrapper = null;
ColumnFamilyHandle defaultColumnFamilyHandle = null;
RocksDBNativeMetricMonitor nativeMetricMonitor = null;
CloseableRegistry cancelRegistryForBackend = new CloseableRegistry();
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
new LinkedHashMap<>();
LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates =
new LinkedHashMap<>();
RocksDB db = null;
RocksDBRestoreOperation restoreOperation = null;
CompletableFuture<Void> asyncCompactAfterRestoreFuture = null;
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
new RocksDbTtlCompactFiltersManager(
ttlTimeProvider,
optionsContainer.getQueryTimeAfterNumEntries(),
optionsContainer.getPeriodicCompactionTime());
ResourceGuard rocksDBResourceGuard = new ResourceGuard();
RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy = null;
PriorityQueueSetFactory priorityQueueFactory;
SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
// Number of bytes required to prefix the key groups.
int keyGroupPrefixBytes =
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
numberOfKeyGroups);
RocksDBManualCompactionManager manualCompactionManager;
try {
// Variables for snapshot strategy when incremental checkpoint is enabled
UUID backendUID = UUID.randomUUID();
SortedMap<Long, Collection<HandleAndLocalPath>> materializedSstFiles = new TreeMap<>();
long lastCompletedCheckpointId = -1L;
if (injectedTestDB != null) {
db = injectedTestDB;
defaultColumnFamilyHandle = injectedDefaultColumnFamilyHandle;
nativeMetricMonitor =
nativeMetricOptions.isEnabled()
? new RocksDBNativeMetricMonitor(
nativeMetricOptions, metricGroup, db, null)
: null;
} else {
prepareDirectories();
restoreOperation =
getRocksDBRestoreOperation(
keyGroupPrefixBytes,
rocksDBResourceGuard,
cancelStreamRegistry,
cancelRegistryForBackend,
kvStateInformation,
registeredPQStates,
ttlCompactFiltersManager);
RocksDBRestoreResult restoreResult = restoreOperation.restore();
db = restoreResult.getDb();
defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
if (ioExecutor != null) {
asyncCompactAfterRestoreFuture =
restoreResult
.getAsyncCompactTaskAfterRestore()
.map((task) -> CompletableFuture.runAsync(task, ioExecutor))
.orElse(null);
}
if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) {
backendUID = restoreResult.getBackendUID();
materializedSstFiles = restoreResult.getRestoredSstFiles();
lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
}
}
writeBatchWrapper =
new RocksDBWriteBatchWrapper(
db, optionsContainer.getWriteOptions(), writeBatchSize);
// it is important that we only create the key builder after the restore, and not
// before;
// restore operations may reconfigure the key serializer, so accessing the key
// serializer
// only now we can be certain that the key serializer used in the builder is final.
sharedRocksKeyBuilder =
new SerializedCompositeKeyBuilder<>(
keySerializerProvider.currentSchemaSerializer(),
keyGroupPrefixBytes,
32);
// init snapshot strategy after db is assured to be initialized
checkpointStrategy =
initializeSavepointAndCheckpointStrategies(
rocksDBResourceGuard,
kvStateInformation,
keyGroupPrefixBytes,
db,
backendUID,
materializedSstFiles,
lastCompletedCheckpointId);
// init priority queue factory
manualCompactionManager =
RocksDBManualCompactionManager.create(db, manualCompactionConfig, ioExecutor);
priorityQueueFactory =
initPriorityQueueFactory(
keyGroupPrefixBytes,
kvStateInformation,
db,
writeBatchWrapper,
nativeMetricMonitor,
manualCompactionManager);
} catch (Throwable e) {
// Do clean up
List<ColumnFamilyOptions> columnFamilyOptions =
new ArrayList<>(kvStateInformation.values().size());
IOUtils.closeQuietly(cancelRegistryForBackend);
IOUtils.closeQuietly(writeBatchWrapper);
IOUtils.closeQuietly(rocksDBResourceGuard);
RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
columnFamilyOptions, defaultColumnFamilyHandle);
IOUtils.closeQuietly(defaultColumnFamilyHandle);
IOUtils.closeQuietly(nativeMetricMonitor);
for (RocksDBKeyedStateBackend.RocksDbKvStateInfo kvStateInfo :
kvStateInformation.values()) {
RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
columnFamilyOptions, kvStateInfo.columnFamilyHandle);
IOUtils.closeQuietly(kvStateInfo.columnFamilyHandle);
}
IOUtils.closeQuietly(db);
// it's possible that db has been initialized but later restore steps failed
IOUtils.closeQuietly(restoreOperation);
IOUtils.closeAllQuietly(columnFamilyOptions);
IOUtils.closeQuietly(optionsContainer);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
kvStateInformation.clear();
IOUtils.closeQuietly(checkpointStrategy);
try {
FileUtils.deleteDirectory(instanceBasePath);
} catch (Exception ex) {
logger.warn("Failed to delete base path for RocksDB: " + instanceBasePath, ex);
}
// Log and rethrow
if (e instanceof BackendBuildingException) {
throw (BackendBuildingException) e;
} else {
String errMsg = "Caught unexpected exception.";
logger.error(errMsg, e);
throw new BackendBuildingException(errMsg, e);
}
}
InternalKeyContext<K> keyContext =
new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
logger.info("Finished building RocksDB keyed state-backend at {}.", instanceBasePath);
return new RocksDBKeyedStateBackend<>(
this.userCodeClassLoader,
this.instanceBasePath,
this.optionsContainer,
columnFamilyOptionsFactory,
this.kvStateRegistry,
this.keySerializerProvider.currentSchemaSerializer(),
this.executionConfig,
this.ttlTimeProvider,
latencyTrackingStateConfig,
sizeTrackingStateConfig,
db,
kvStateInformation,
registeredPQStates,
keyGroupPrefixBytes,
cancelRegistryForBackend,
this.keyGroupCompressionDecorator,
rocksDBResourceGuard,
checkpointStrategy,
writeBatchWrapper,
defaultColumnFamilyHandle,
nativeMetricMonitor,
sharedRocksKeyBuilder,
priorityQueueFactory,
ttlCompactFiltersManager,
keyContext,
writeBatchSize,
asyncCompactAfterRestoreFuture,
manualCompactionManager);
}