in samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala [40:117]
def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean,
storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
var ttl = 0L
var useTTL = false
if (storeConfig.containsKey("rocksdb.ttl.ms")) {
try {
ttl = storeConfig.getLong("rocksdb.ttl.ms")
if (ttl > 0) {
if (ttl < 1000) {
warn("The ttl value requested for %s is %d which is less than 1000 (minimum). " +
"Using 1000 ms instead.", storeName, ttl)
ttl = 1000
}
ttl = TimeUnit.MILLISECONDS.toSeconds(ttl)
} else {
warn("Non-positive TTL for RocksDB implies infinite TTL for the data. " +
"More Info - https://github.com/facebook/rocksdb/wiki/Time-to-Live")
}
useTTL = true
if (isLoggedStore) {
warn("%s is a TTL based store. Changelog is not supported for TTL based stores. " +
"Use at your own discretion." format storeName)
}
} catch {
case nfe: NumberFormatException =>
throw new SamzaException("rocksdb.ttl.ms configuration value %s for store %s is not a number."
format (storeConfig.get("rocksdb.ttl.ms"), storeName), nfe)
}
}
try {
// Create the path if it doesn't exist
new FileUtil().createDirectories(dir.toPath)
val rocksDb =
if (useTTL) {
info("Opening RocksDB store: %s in path: %s with TTL value: %s" format (storeName, dir.toString, ttl))
TtlDB.open(options, dir.toString, ttl.toInt, false)
} else {
RocksDB.open(options, dir.toString)
}
// See https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h for available properties
val rocksDbMetrics = Set (
"rocksdb.estimate-table-readers-mem", // indexes and bloom filters
"rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes
"rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes
"rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes
"rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage
)
val configuredMetrics = storeConfig
.get("rocksdb.metrics.list", "")
.split(",")
.map(property => property.trim)
.filter(!_.isEmpty)
.toSet
(configuredMetrics ++ rocksDbMetrics)
.foreach(property => metrics.newGauge(property, () =>
// Check isOwningHandle flag. The db is open iff the flag is true.
if (rocksDb.isOwningHandle) {
rocksDb.getProperty(property)
} else {
"0"
}
))
rocksDb
} catch {
case rocksDBException: RocksDBException =>
throw new SamzaException("Error opening RocksDB store %s at location %s" format (storeName, dir.toString),
rocksDBException)
}
}