def openDB()

in samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala [40:117]


  def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean,
             storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
    var ttl = 0L
    var useTTL = false

    if (storeConfig.containsKey("rocksdb.ttl.ms")) {
      try {
        ttl = storeConfig.getLong("rocksdb.ttl.ms")

        if (ttl > 0) {
          if (ttl < 1000) {
            warn("The ttl value requested for %s is %d which is less than 1000 (minimum). " +
              "Using 1000 ms instead.", storeName, ttl)
            ttl = 1000
          }
          ttl = TimeUnit.MILLISECONDS.toSeconds(ttl)
        } else {
          warn("Non-positive TTL for RocksDB implies infinite TTL for the data. " +
            "More Info - https://github.com/facebook/rocksdb/wiki/Time-to-Live")
        }

        useTTL = true
        if (isLoggedStore) {
          warn("%s is a TTL based store. Changelog is not supported for TTL based stores. " +
            "Use at your own discretion." format storeName)
        }
      } catch {
        case nfe: NumberFormatException =>
          throw new SamzaException("rocksdb.ttl.ms configuration value %s for store %s is not a number."
            format (storeConfig.get("rocksdb.ttl.ms"), storeName), nfe)
      }
    }

    try {
      // Create the path if it doesn't exist
      new FileUtil().createDirectories(dir.toPath)

      val rocksDb =
        if (useTTL) {
          info("Opening RocksDB store: %s in path: %s with TTL value: %s" format (storeName, dir.toString, ttl))
          TtlDB.open(options, dir.toString, ttl.toInt, false)
        } else {
          RocksDB.open(options, dir.toString)
        }

      // See https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h for available properties
      val rocksDbMetrics = Set (
        "rocksdb.estimate-table-readers-mem", // indexes and bloom filters
        "rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes
        "rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes
        "rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes
        "rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage
      )

      val configuredMetrics = storeConfig
        .get("rocksdb.metrics.list", "")
        .split(",")
        .map(property => property.trim)
        .filter(!_.isEmpty)
        .toSet

      (configuredMetrics ++ rocksDbMetrics)
        .foreach(property => metrics.newGauge(property, () =>
          // Check isOwningHandle flag. The db is open iff the flag is true.
          if (rocksDb.isOwningHandle) {
            rocksDb.getProperty(property)
          } else {
            "0"
          }
        ))

      rocksDb
    } catch {
      case rocksDBException: RocksDBException =>
        throw new SamzaException("Error opening RocksDB store %s at location %s" format (storeName, dir.toString),
          rocksDBException)
    }
  }