public static Options options()

in samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java [89:158]


  public static Options options(Config storeConfig, int numTasksForContainer, long defaultMaxManifestFileSize,
      File storeDir, StorageEngineFactory.StoreMode storeMode) {
    Options options = new Options();

    if (storeConfig.getBoolean(ROCKSDB_WAL_ENABLED, false)) {
      options.setManualWalFlush(true); // store.flush() will flushWAL(sync = true) instead
      options.setWalRecoveryMode(WALRecoveryMode.AbsoluteConsistency);
    }

    Long writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024);
    // Cache size and write buffer size are specified on a per-container basis.
    options.setWriteBufferSize((int) (writeBufSize / numTasksForContainer));

    CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION;
    String compressionInConfig = storeConfig.get(ROCKSDB_COMPRESSION, "snappy");
    switch (compressionInConfig) {
      case "snappy":
        compressionType = CompressionType.SNAPPY_COMPRESSION;
        break;
      case "bzip2":
        compressionType = CompressionType.BZLIB2_COMPRESSION;
        break;
      case "zlib":
        compressionType = CompressionType.ZLIB_COMPRESSION;
        break;
      case "lz4":
        compressionType = CompressionType.LZ4_COMPRESSION;
        break;
      case "lz4hc":
        compressionType = CompressionType.LZ4HC_COMPRESSION;
        break;
      case "none":
        compressionType = CompressionType.NO_COMPRESSION;
        break;
      default:
        log.warn("Unknown rocksdb.compression codec " + compressionInConfig +
            ", overwriting to " + compressionType.name());
    }
    options.setCompressionType(compressionType);

    long blockCacheSize = getBlockCacheSize(storeConfig, numTasksForContainer);
    int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096);
    BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
    tableOptions.setBlockCacheSize(blockCacheSize).setBlockSize(blockSize);
    options.setTableFormatConfig(tableOptions);

    setCompactionOptions(storeConfig, options);

    options.setMaxWriteBufferNumber(storeConfig.getInt(ROCKSDB_NUM_WRITE_BUFFERS, 3));
    options.setCreateIfMissing(true);
    options.setErrorIfExists(false);

    options.setMaxLogFileSize(storeConfig.getLong(ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 64 * 1024 * 1024L));
    options.setKeepLogFileNum(storeConfig.getLong(ROCKSDB_KEEP_LOG_FILE_NUM, 2));
    options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS, 21600000000L));
    options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1));
    options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS, 16));
    // The default for rocksdb is 1GB (1024*1024*1024 bytes)
    options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE, defaultMaxManifestFileSize));
    // use prepareForBulk load only when i. the store is being requested in BulkLoad mode
    // and ii. the storeDirectory does not exist (fresh restore), because bulk load does not work seamlessly with
    // existing stores : https://github.com/facebook/rocksdb/issues/2734
    StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
    if (storeMode.equals(StorageEngineFactory.StoreMode.BulkLoad) && !storageManagerUtil.storeExists(storeDir)) {
      log.info("Using prepareForBulkLoad for restore to " + storeDir);
      options.prepareForBulkLoad();
    }

    return options;
  }