app/packer/PackerProcessMonitor.scala (53 lines of code) (raw):
package packer
import java.io.{InputStreamReader, BufferedReader}
import event.EventBus
import event.BakeEvent.{AmiCreated, Log, PackerProcessExited}
import models.{BakeLog, BakeId}
import org.joda.time.DateTime
import scala.annotation.tailrec
import scala.concurrent.Promise
import scala.util.control.NonFatal
object PackerProcessMonitor {
/** Monitors the given Packer process and consumes its output stream. Sends
* updates to the listener and completes the promise with the process's exit
* value
*/
def monitorProcess(
process: Process,
exitValuePromise: Promise[Int],
bakeId: BakeId,
eventBus: EventBus
): Unit = {
try {
val bufferedReader = new BufferedReader(
new InputStreamReader(process.getInputStream)
)
// When this returns it means the stream has closed, which means the process has exited
processNextLine(process, bufferedReader, bakeId, eventBus)
process.waitFor()
val exitValue = process.exitValue()
eventBus.publish(PackerProcessExited(bakeId, exitValue))
exitValuePromise.trySuccess(exitValue)
} catch {
case NonFatal(e) => exitValuePromise.tryFailure(e)
}
}
@tailrec
private def processNextLine(
process: Process,
reader: BufferedReader,
bakeId: BakeId,
eventBus: EventBus,
logNumber: Int = 0
): Unit = {
val line = reader.readLine()
if (line != null) {
var nextLogNumber = logNumber
PackerOutputParser.parseLine(line).foreach {
case PackerOutputParser.UiOutput(logLevel, messageParts) =>
val bakeLog =
BakeLog(bakeId, logNumber, DateTime.now, logLevel, messageParts)
eventBus.publish(Log(bakeId, bakeLog))
nextLogNumber += 1
case PackerOutputParser.AmiCreated(amiId) =>
eventBus.publish(AmiCreated(bakeId, amiId))
}
processNextLine(process, reader, bakeId, eventBus, nextLogNumber)
}
// if line is null it means the stream has closed, so stop recursing and return
}
}