in samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala [126:218]
def restore(iterator: ChangelogSSPIterator) {
info("Restoring entries for store: " + storeName + " in directory: " + storeDir.toString)
var restoredMessages = 0
var restoredBytes = 0
var trimmedMessages = 0
var trimmedBytes = 0
var previousMode = ChangelogSSPIterator.Mode.RESTORE
val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
var lastBatchFlushed = false
while(iterator.hasNext && !Thread.currentThread().isInterrupted) {
val envelope = iterator.next()
val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
val mode = iterator.getMode
if (mode.equals(ChangelogSSPIterator.Mode.RESTORE)) {
if (previousMode == ChangelogSSPIterator.Mode.TRIM) {
throw new IllegalStateException(
String.format("Illegal ChangelogSSPIterator mode change from TRIM to RESTORE for store: %s " +
"in dir: %s with changelog SSP: {}.", storeName, storeDir, changelogSSP))
}
batch.add(new Entry(keyBytes, valBytes))
if (batch.size >= batchSize) {
doPutAll(rawStore, batch)
batch.clear()
}
// update metrics
restoredMessages += 1
restoredBytes += keyBytes.length
if (valBytes != null) restoredBytes += valBytes.length
metrics.restoredMessagesGauge.set(restoredMessages)
metrics.restoredBytesGauge.set(restoredBytes)
// log progress every million messages
if (restoredMessages % 1000000 == 0) {
info(restoredMessages + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...")
}
} else {
// first write any open restore batches to store
if (!lastBatchFlushed) {
info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
if (batch.size > 0) {
doPutAll(rawStore, batch)
batch.clear()
}
lastBatchFlushed = true
}
// then overwrite the value to be trimmed with its current store value
val currentValBytes = rawStore.get(keyBytes)
val changelogMessage = new OutgoingMessageEnvelope(
changelogSSP.getSystemStream, changelogSSP.getPartition, keyBytes, currentValBytes)
changelogCollector.send(changelogMessage)
// update metrics
trimmedMessages += 1
trimmedBytes += keyBytes.length
if (currentValBytes != null) trimmedBytes += currentValBytes.length
metrics.trimmedMessagesGauge.set(trimmedMessages)
metrics.trimmedBytesGauge.set(trimmedBytes)
// log progress every hundred thousand messages
if (trimmedMessages % 100000 == 0) {
info(trimmedMessages + " entries trimmed for store: " + storeName + " in directory: " + storeDir.toString + "...")
}
}
previousMode = mode
}
// if the last batch isn't flushed yet (e.g., for non transactional state or no messages to trim), flush it now
if (!lastBatchFlushed) {
info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
if (batch.size > 0) {
doPutAll(rawStore, batch)
batch.clear()
}
lastBatchFlushed = true
}
info(trimmedMessages + " entries trimmed for store: " + storeName + " in directory: " + storeDir.toString + ".")
// flush the store and the changelog producer
flush() // TODO HIGH pmaheshw SAMZA-2338: Need a way to flush changelog producers. This only flushes the stores.
if (Thread.currentThread().isInterrupted) {
warn("Received an interrupt during store restoration. Exiting without restoring the full state.")
throw new InterruptedException("Received an interrupt during store restoration.")
}
}