override protected def initialAttributes: Attributes = Attributes.name()

in core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala [496:605]


  override protected def initialAttributes: Attributes = Attributes.name("KubernetesHttpLogSource")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogicWithLogging(shape) { logic =>

      private val queue = mutable.Queue.empty[TypedLogLine]
      private var lastTimestamp = sinceTime

      def fetchLogs(): Unit =
        try {
          val path = constructPath(kubeRestClient.getNamespace, id.asString)
          val query = constructQuery(lastTimestamp, waitForSentinel)

          log.debug("*** Fetching K8S HTTP Logs w/ Path: {} Query: {}", path, query)

          val url = Uri(kubeRestClient.getMasterUrl.toString)
            .withPath(path)
            .withQuery(query)

          val request = new Request.Builder().get().url(url.toString).build

          kubeRestClient.getHttpClient.newCall(request).enqueue(new LogFetchCallback())
        } catch {
          case NonFatal(e) =>
            onFailure(e)
            throw e
        }

      def onFailure(e: Throwable): Unit = e match {
        case _: SocketTimeoutException =>
          log.warning("* Logging socket to Kubernetes timed out.") // this should only happen with follow behavior
        case _ =>
          log.error(e, "* Retrieving the logs from Kubernetes failed.")
      }

      val emitCallback: AsyncCallback[Seq[TypedLogLine]] = getAsyncCallback[Seq[TypedLogLine]] {
        case lines @ firstLine +: restOfLines =>
          if (isAvailable(out)) {
            log.debug("* Lines Available & output ready; pushing {} (remaining: {})", firstLine, restOfLines)
            pushLine(firstLine)
            queue ++= restOfLines
          } else {
            log.debug("* Output isn't ready; queueing lines: {}", lines)
            queue ++= lines
          }
        case Nil =>
          log.debug("* Empty lines returned.")
          retryLogs()
      }

      class LogFetchCallback extends Callback {

        override def onFailure(call: Call, e: IOException): Unit = logic.onFailure(e)

        override def onResponse(call: Call, response: Response): Unit =
          try {
            val lines = readLines(response.body.source, lastTimestamp)

            log.debug("* Read & decoded lines for K8S HTTP: {}", lines)

            response.body.source.close()

            lines.lastOption.foreach { line =>
              log.debug("* Updating lastTimestamp (sinceTime) to {}", Option(line.time))
              lastTimestamp = Option(line.time)
            }

            emitCallback.invoke(lines)
          } catch {
            case NonFatal(e) =>
              log.error(e, "* Reading Kubernetes HTTP Response failed.")
              logic.onFailure(e)
              throw e
          }
      }

      def pushLine(line: TypedLogLine): Unit = {
        log.debug("* Pushing a chunk of kubernetes logging: {}", line)
        push(out, line)
      }

      setHandler(
        out,
        new OutHandler {
          override def onPull(): Unit = {
            // if we still have lines queued up, return those; else make a new HTTP read.
            if (queue.nonEmpty) {
              log.debug("* onPull, nonEmpty queue... pushing line")
              pushLine(queue.dequeue())
            } else {
              log.debug("* onPull, empty queue... fetching logs")
              fetchLogs()
            }
          }
        })

      def retryLogs(): Unit = {
        // Pause before retrying so we don't thrash Kubernetes w/ HTTP requests
        log.debug("* Scheduling a retry of log fetch in {}", retryDelay)
        scheduleOnce(K8SRestLogRetry, retryDelay)
      }

      override protected def onTimer(timerKey: Any): Unit = timerKey match {
        case K8SRestLogRetry =>
          log.debug("* Timer trigger for log fetch retry")
          fetchLogs()
        case x =>
          log.warning("* Got a timer trigger with an unknown key: {}", x)
      }
    }