in core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/ContainerMessageConsumer.scala [55:130]
private def handler(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
ContainerMessage.parse(raw) match {
case Success(creation: ContainerCreationMessage) if isWarmUpAction(creation.action) =>
logging.info(
this,
s"container creation message for ${creation.invocationNamespace}/${creation.action} is received (creationId: ${creation.creationId})")
feed ! MessageFeed.Processed
case Success(creation: ContainerCreationMessage) =>
implicit val transid: TransactionId = creation.transid
logging
.info(this, s"container creation message for ${creation.invocationNamespace}/${creation.action} is received")
val createContainer = for {
identity <- Identity.get(authStore, EntityName(creation.invocationNamespace))
action <- WhiskAction
.get(entityStore, creation.action.toDocId, creation.revision, fromCache = true)
} yield {
// check action limits before creating container
action.limits.checkLimits(identity)
containerPool ! CreationContainer(creation, action)
feed ! MessageFeed.Processed
}
createContainer.recover {
case t =>
val creationError = t match {
case _: ActionLimitsException => InvalidActionLimitError
case _ => DBFetchError
}
val message = t match {
case _: ActionLimitsException => t.getMessage // return generated failed message
case _: NoDocumentException =>
Messages.actionRemovedWhileInvoking
case _: DocumentRevisionMismatchException =>
Messages.actionMismatchWhileInvoking
case e: Throwable =>
logging.error(
this,
s"An unknown DB error occurred while fetching action ${creation.invocationNamespace}/${creation.action} for creation ${creation.creationId}, error: $e.")
Messages.actionFetchErrorWhileInvoking
}
logging.error(
this,
s"failed to create a container ${creation.invocationNamespace}/${creation.action}, error: $message (creationId: ${creation.creationId})")
val ack = ContainerCreationAckMessage(
creation.transid,
creation.creationId,
creation.invocationNamespace,
creation.action,
creation.revision,
creation.whiskActionMetaData,
invokerInstanceId,
creation.schedulerHost,
creation.rpcPort,
creation.retryCount,
Some(creationError),
Some(message))
sendAckToScheduler(creation.rootSchedulerIndex, ack)
feed ! MessageFeed.Processed
}
case Success(deletion: ContainerDeletionMessage) =>
implicit val transid: TransactionId = deletion.transid
logging.info(this, s"deletion message for ${deletion.invocationNamespace}/${deletion.action} is received")
containerPool ! DeletionContainer(deletion)
feed ! MessageFeed.Processed
case Failure(t) =>
logging.error(this, s"Failed to parse $bytes, error: ${t.getMessage}")
feed ! MessageFeed.Processed
case _ =>
logging.error(this, s"Unexpected message received $raw")
feed ! MessageFeed.Processed
}
}