in kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala [76:117]
override def flush(): Unit = {
val contents = internalBytes.synchronized {
logger.trace("Getting content to flush")
val bytesToString = new String(internalBytes.toArray, EncodingType)
// Clear the internal buffer
internalBytes.clear()
// Stop the auto-flushing
disableAutoFlush()
bytesToString
}
// Avoid building and sending a kernel message if the contents (when
// trimmed) are empty and the flag to send anyway is disabled
if (!sendEmptyOutput && contents.trim.isEmpty) {
val contentsWithVisibleWhitespace = contents
.replace("\n", "\\n")
.replace("\t", "\\t")
.replace("\r", "\\r")
.replace(" ", "\\s")
logger.warn(s"Suppressing empty output: '$contentsWithVisibleWhitespace'")
return
}
logger.trace(s"Content to flush: '$contents'")
val streamContent = StreamContent(
streamType, contents
)
val kernelMessage = kmBuilder
.withIds(Seq(MessageType.Outgoing.Stream.toString.getBytes))
.withHeader(MessageType.Outgoing.Stream)
.withContentString(streamContent).build
actorLoader.load(SystemActorType.KernelMessageRelay) ! kernelMessage
// Ensure any underlying implementation is processed
super.flush()
}