protected abstract KeyValueStore getKVStore()

in samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java [68:145]


  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
      File storeDir,
      MetricsRegistry registry,
      JobContext jobContext,
      ContainerContext containerContext,
      StoreMode storeMode);

  /**
   * Constructs a key-value StorageEngine and returns it to the caller
   *
   * @param storeName The name of the storage engine.
   * @param storeDir The directory of the storage engine.
   * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
   * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
   * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
   * @param changelogSSP Samza system stream partition from which to receive the changelog.
   * @param containerContext Information about the container in which the task is executing.
   **/
  public StorageEngine getStorageEngine(String storeName,
      File storeDir,
      Serde<K> keySerde,
      Serde<V> msgSerde,
      MessageCollector changelogCollector,
      MetricsRegistry registry,
      SystemStreamPartition changelogSSP,
      JobContext jobContext,
      ContainerContext containerContext,
      StoreMode storeMode) {
    Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
    Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
      throw new SamzaException(
          String.format("Store factory not defined for store %s. Cannot proceed with KV store creation!", storeName));
    }
    if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
      storePropertiesBuilder.setPersistedToDisk(true);
    }
    // The store is durable iff it is backed by the task backup manager
    List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
    storePropertiesBuilder.setIsDurable(!storeBackupManagers.isEmpty());

    int batchSize = storageConfigSubset.getInt(WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
    int cacheSize = storageConfigSubset.getInt(OBJECT_CACHE_SIZE, Math.max(batchSize, DEFAULT_OBJECT_CACHE_SIZE));
    if (cacheSize > 0 && cacheSize < batchSize) {
      throw new SamzaException(
          String.format("cache.size for store %s cannot be less than batch.size as batched values reside in cache.",
              storeName));
    }
    if (keySerde == null) {
      throw new SamzaException(
          String.format("Must define a key serde when using key value storage for store %s.", storeName));
    }
    if (msgSerde == null) {
      throw new SamzaException(
          String.format("Must define a message serde when using key value storage for store %s.", storeName));
    }

    KeyValueStore<byte[], byte[]> rawStore =
        getKVStore(storeName, storeDir, registry, jobContext, containerContext, storeMode);
    KeyValueStore<byte[], byte[]> maybeLoggedStore = buildMaybeLoggedStore(changelogSSP,
        storeName, registry, storePropertiesBuilder, rawStore, changelogCollector);
    // this also applies serialization and caching layers
    KeyValueStore<K, V> toBeAccessLoggedStore = buildStoreWithLargeMessageHandling(storeName, registry,
        maybeLoggedStore, storageConfig, cacheSize, batchSize, keySerde, msgSerde);
    KeyValueStore<K, V> maybeAccessLoggedStore =
        buildMaybeAccessLoggedStore(storeName, toBeAccessLoggedStore, changelogCollector, changelogSSP, storageConfig,
            keySerde);
    KeyValueStore<K, V> nullSafeStore = new NullSafeKeyValueStore<>(maybeAccessLoggedStore);

    KeyValueStorageEngineMetrics keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry);
    HighResolutionClock clock = buildClock(jobContext.getConfig());
    return new KeyValueStorageEngine<>(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
        changelogSSP, changelogCollector, keyValueStorageEngineMetrics, batchSize,
        ScalaJavaUtil.toScalaFunction(clock::nanoTime));
  }