def restore()

in samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala [126:218]


  def restore(iterator: ChangelogSSPIterator) {
    info("Restoring entries for store: " + storeName + " in directory: " + storeDir.toString)
    var restoredMessages = 0
    var restoredBytes = 0
    var trimmedMessages = 0
    var trimmedBytes = 0
    var previousMode = ChangelogSSPIterator.Mode.RESTORE

    val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
    var lastBatchFlushed = false

    while(iterator.hasNext && !Thread.currentThread().isInterrupted) {
      val envelope = iterator.next()
      val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
      val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
      val mode = iterator.getMode

      if (mode.equals(ChangelogSSPIterator.Mode.RESTORE)) {
        if (previousMode == ChangelogSSPIterator.Mode.TRIM) {
          throw new IllegalStateException(
            String.format("Illegal ChangelogSSPIterator mode change from TRIM to RESTORE for store: %s " +
              "in dir: %s with changelog SSP: {}.", storeName, storeDir, changelogSSP))
        }
        batch.add(new Entry(keyBytes, valBytes))

        if (batch.size >= batchSize) {
          doPutAll(rawStore, batch)
          batch.clear()
        }

        // update metrics
        restoredMessages += 1
        restoredBytes += keyBytes.length
        if (valBytes != null) restoredBytes += valBytes.length
        metrics.restoredMessagesGauge.set(restoredMessages)
        metrics.restoredBytesGauge.set(restoredBytes)

        // log progress every million messages
        if (restoredMessages % 1000000 == 0) {
          info(restoredMessages + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...")
        }
      } else {
        // first write any open restore batches to store
        if (!lastBatchFlushed) {
          info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
          if (batch.size > 0) {
            doPutAll(rawStore, batch)
            batch.clear()
          }
          lastBatchFlushed = true
        }

        // then overwrite the value to be trimmed with its current store value
        val currentValBytes = rawStore.get(keyBytes)
        val changelogMessage = new OutgoingMessageEnvelope(
          changelogSSP.getSystemStream, changelogSSP.getPartition, keyBytes, currentValBytes)
        changelogCollector.send(changelogMessage)

        // update metrics
        trimmedMessages += 1
        trimmedBytes += keyBytes.length
        if (currentValBytes != null) trimmedBytes += currentValBytes.length
        metrics.trimmedMessagesGauge.set(trimmedMessages)
        metrics.trimmedBytesGauge.set(trimmedBytes)

        // log progress every hundred thousand messages
        if (trimmedMessages % 100000 == 0) {
          info(trimmedMessages + " entries trimmed for store: " + storeName + " in directory: " + storeDir.toString + "...")
        }
      }

      previousMode = mode
    }

    // if the last batch isn't flushed yet (e.g., for non transactional state or no messages to trim), flush it now
    if (!lastBatchFlushed) {
      info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
      if (batch.size > 0) {
        doPutAll(rawStore, batch)
        batch.clear()
      }
      lastBatchFlushed = true
    }
    info(trimmedMessages + " entries trimmed for store: " + storeName + " in directory: " + storeDir.toString + ".")

    // flush the store and the changelog producer
    flush() // TODO HIGH pmaheshw SAMZA-2338: Need a way to flush changelog producers. This only flushes the stores.

    if (Thread.currentThread().isInterrupted) {
      warn("Received an interrupt during store restoration. Exiting without restoring the full state.")
      throw new InterruptedException("Received an interrupt during store restoration.")
    }
  }