in core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala [152:215]
private def invokeSimpleAction(
user: Identity,
action: ExecutableWhiskActionMetaData,
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
// merge package parameters with action (action parameters supersede), then merge in payload
val args: Option[JsValue] = payload match {
case Some(JsObject(fields)) => action.parameters merge Some(JsObject(fields))
case Some(JsArray(elements)) => Some(JsArray(elements))
case _ => Some(action.parameters.toJsObject)
}
val activationId = activationIdFactory.make()
val startActivation = transid.started(
this,
waitForResponse
.map(_ => LoggingMarkers.CONTROLLER_ACTIVATION_BLOCKING)
.getOrElse(LoggingMarkers.CONTROLLER_ACTIVATION),
logLevel = InfoLevel)
val startLoadbalancer =
transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${activationId}")
val keySet = payload match {
case Some(JsObject(fields)) => Some(fields.keySet)
case _ => None
}
val message = ActivationMessage(
transid,
FullyQualifiedEntityName(action.namespace, action.name, Some(action.version), action.binding),
action.rev,
user,
activationId, // activation id created here
activeAckTopicIndex,
waitForResponse.isDefined,
args,
action.parameters.initParameters,
action.parameters.lockedParameters(keySet.getOrElse(Set.empty)),
cause = cause,
WhiskTracerProvider.tracer.getTraceContext(transid))
val postedFuture = loadBalancer.publish(action, message)
postedFuture andThen {
case Success(_) => transid.finished(this, startLoadbalancer)
case Failure(e) => transid.failed(this, startLoadbalancer, e.getMessage)
} flatMap { activeAckResponse =>
// is caller waiting for the result of the activation?
waitForResponse
.map { timeout =>
// yes, then wait for the activation response from the message bus
// (known as the active response or active ack)
waitForActivationResponse(user, message.activationId, timeout, activeAckResponse)
}
.getOrElse {
// no, return the activation id
Future.successful(Left(message.activationId))
}
} andThen {
case Success(_) => transid.finished(this, startActivation)
case Failure(e) => transid.failed(this, startActivation, e.getMessage)
}
}