in core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala [329:406]
private def invokeConductor(user: Identity,
payload: Option[JsValue],
session: Session,
parentTid: TransactionId): Future[ActivationResponse] = {
implicit val transid: TransactionId = TransactionId.childOf(parentTid)
if (session.accounting.conductors > 2 * actionSequenceLimit) {
// composition is too long
Future.successful(ActivationResponse.applicationError(compositionIsTooLong))
} else {
// inject state into payload if any
val params: Option[JsValue] = payload match {
case Some(JsObject(fields)) =>
session.state
.map(state => Some(JsObject(JsObject(fields).fields ++ state.fields)))
.getOrElse(payload)
case _ => None
}
// invoke conductor action
session.accounting.conductors += 1
val activationResponse =
invokeSimpleAction(
user,
action = session.action,
payload = params,
waitForResponse = Some(session.action.limits.timeout.duration + 1.minute), // wait for result
cause = Some(session.activationId)) // cause is session id
waitForActivation(user, session, activationResponse).flatMap {
case Left(response) => // unsuccessful invocation, return error response
Future.successful(response)
case Right(activation) => // successful invocation
val result = activation.resultAsJson
// extract params from result, auto boxing result if not a dictionary
val params = result.fields.get(WhiskActivation.paramsField).map {
case obj: JsObject => obj
case value => JsObject(WhiskActivation.valueField -> value)
}
// update session state, auto boxing state if not a dictionary
session.state = result.fields.get(WhiskActivation.stateField).map {
case obj: JsObject => obj
case value => JsObject(WhiskActivation.stateField -> value)
}
// extract next action from result and invoke
result.fields.get(WhiskActivation.actionField) match {
case None =>
// no next action, end composition execution, return to caller
Future.successful(ActivationResponse(activation.response.statusCode, Some(params.getOrElse(result))))
case Some(next) =>
FullyQualifiedEntityName.resolveName(next, user.namespace.name) match {
case Some(fqn) if session.accounting.components < actionSequenceLimit =>
tryInvokeNext(user, fqn, params, session, transid)
case Some(_) => // composition is too long
invokeConductor(
user,
payload = Some(JsObject(ERROR_FIELD -> JsString(compositionIsTooLong))),
session = session,
transid)
case None => // parsing failure
invokeConductor(
user,
payload = Some(JsObject(ERROR_FIELD -> JsString(compositionComponentInvalid(next)))),
session = session,
transid)
}
}
}
}
}