private def handler()

in core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala [55:130]


  private def handler(bytes: Array[Byte]): Future[Unit] = Future {
    val raw = new String(bytes, StandardCharsets.UTF_8)
    ContainerMessage.parse(raw) match {
      case Success(creation: ContainerCreationMessage) if isWarmUpAction(creation.action) =>
        logging.info(
          this,
          s"container creation message for ${creation.invocationNamespace}/${creation.action} is received (creationId: ${creation.creationId})")
        feed ! MessageFeed.Processed

      case Success(creation: ContainerCreationMessage) =>
        implicit val transid: TransactionId = creation.transid
        logging
          .info(this, s"container creation message for ${creation.invocationNamespace}/${creation.action} is received")

        val createContainer = for {
          identity <- Identity.get(authStore, EntityName(creation.invocationNamespace))
          action <- WhiskAction
            .get(entityStore, creation.action.toDocId, creation.revision, fromCache = true)
        } yield {
          // check action limits before creating container
          action.limits.checkLimits(identity)
          containerPool ! CreationContainer(creation, action)
          feed ! MessageFeed.Processed
        }
        createContainer.recover {
          case t =>
            val creationError = t match {
              case _: ActionLimitsException => InvalidActionLimitError
              case _                        => DBFetchError
            }
            val message = t match {
              case _: ActionLimitsException => t.getMessage // return generated failed message
              case _: NoDocumentException =>
                Messages.actionRemovedWhileInvoking
              case _: DocumentRevisionMismatchException =>
                Messages.actionMismatchWhileInvoking
              case e: Throwable =>
                logging.error(
                  this,
                  s"An unknown DB error occurred while fetching action ${creation.invocationNamespace}/${creation.action} for creation ${creation.creationId}, error: $e.")
                Messages.actionFetchErrorWhileInvoking
            }
            logging.error(
              this,
              s"failed to create a container ${creation.invocationNamespace}/${creation.action}, error: $message (creationId: ${creation.creationId})")

            val ack = ContainerCreationAckMessage(
              creation.transid,
              creation.creationId,
              creation.invocationNamespace,
              creation.action,
              creation.revision,
              creation.whiskActionMetaData,
              invokerInstanceId,
              creation.schedulerHost,
              creation.rpcPort,
              creation.retryCount,
              Some(creationError),
              Some(message))
            sendAckToScheduler(creation.rootSchedulerIndex, ack)
            feed ! MessageFeed.Processed
        }
      case Success(deletion: ContainerDeletionMessage) =>
        implicit val transid: TransactionId = deletion.transid
        logging.info(this, s"deletion message for ${deletion.invocationNamespace}/${deletion.action} is received")
        containerPool ! DeletionContainer(deletion)
        feed ! MessageFeed.Processed
      case Failure(t) =>
        logging.error(this, s"Failed to parse $bytes, error: ${t.getMessage}")
        feed ! MessageFeed.Processed

      case _ =>
        logging.error(this, s"Unexpected message received $raw")
        feed ! MessageFeed.Processed
    }
  }