in core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala [496:605]
override protected def initialAttributes: Attributes = Attributes.name("KubernetesHttpLogSource")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogicWithLogging(shape) { logic =>
private val queue = mutable.Queue.empty[TypedLogLine]
private var lastTimestamp = sinceTime
def fetchLogs(): Unit =
try {
val path = constructPath(kubeRestClient.getNamespace, id.asString)
val query = constructQuery(lastTimestamp, waitForSentinel)
log.debug("*** Fetching K8S HTTP Logs w/ Path: {} Query: {}", path, query)
val url = Uri(kubeRestClient.getMasterUrl.toString)
.withPath(path)
.withQuery(query)
val request = new Request.Builder().get().url(url.toString).build
kubeRestClient.getHttpClient.newCall(request).enqueue(new LogFetchCallback())
} catch {
case NonFatal(e) =>
onFailure(e)
throw e
}
def onFailure(e: Throwable): Unit = e match {
case _: SocketTimeoutException =>
log.warning("* Logging socket to Kubernetes timed out.") // this should only happen with follow behavior
case _ =>
log.error(e, "* Retrieving the logs from Kubernetes failed.")
}
val emitCallback: AsyncCallback[Seq[TypedLogLine]] = getAsyncCallback[Seq[TypedLogLine]] {
case lines @ firstLine +: restOfLines =>
if (isAvailable(out)) {
log.debug("* Lines Available & output ready; pushing {} (remaining: {})", firstLine, restOfLines)
pushLine(firstLine)
queue ++= restOfLines
} else {
log.debug("* Output isn't ready; queueing lines: {}", lines)
queue ++= lines
}
case Nil =>
log.debug("* Empty lines returned.")
retryLogs()
}
class LogFetchCallback extends Callback {
override def onFailure(call: Call, e: IOException): Unit = logic.onFailure(e)
override def onResponse(call: Call, response: Response): Unit =
try {
val lines = readLines(response.body.source, lastTimestamp)
log.debug("* Read & decoded lines for K8S HTTP: {}", lines)
response.body.source.close()
lines.lastOption.foreach { line =>
log.debug("* Updating lastTimestamp (sinceTime) to {}", Option(line.time))
lastTimestamp = Option(line.time)
}
emitCallback.invoke(lines)
} catch {
case NonFatal(e) =>
log.error(e, "* Reading Kubernetes HTTP Response failed.")
logic.onFailure(e)
throw e
}
}
def pushLine(line: TypedLogLine): Unit = {
log.debug("* Pushing a chunk of kubernetes logging: {}", line)
push(out, line)
}
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
// if we still have lines queued up, return those; else make a new HTTP read.
if (queue.nonEmpty) {
log.debug("* onPull, nonEmpty queue... pushing line")
pushLine(queue.dequeue())
} else {
log.debug("* onPull, empty queue... fetching logs")
fetchLogs()
}
}
})
def retryLogs(): Unit = {
// Pause before retrying so we don't thrash Kubernetes w/ HTTP requests
log.debug("* Scheduling a retry of log fetch in {}", retryDelay)
scheduleOnce(K8SRestLogRetry, retryDelay)
}
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case K8SRestLogRetry =>
log.debug("* Timer trigger for log fetch retry")
fetchLogs()
case x =>
log.warning("* Got a timer trigger with an unknown key: {}", x)
}
}