in core/controller/src/main/scala/org/apache/openwhisk/core/controller/WebActions.scala [661:710]
private def completeRequest(queuedActivation: Future[Either[ActivationId, WhiskActivation]],
responseType: MediaExtension)(implicit transid: TransactionId) = {
onComplete(queuedActivation) {
case Success(Right(activation)) =>
respondWithActivationIdHeader(activation.activationId) {
val result = activation.resultAsJson
if (activation.response.isSuccess || activation.response.isApplicationError) {
val resultPath = if (activation.response.isSuccess) {
List.empty
} else {
// the activation produced an error response, so look for an error property
// in the response, unwrap it and use it to terminate the response
List(ActivationResponse.ERROR_FIELD)
}
val result = getFieldPath(activation.resultAsJson, resultPath)
result match {
case Some(projection) =>
val marshaler = Future(responseType.transcoder(projection, transid, webApiDirectives))
onComplete(marshaler) {
case Success(done) => done // all transcoders terminate the connection
case Failure(t) => terminate(InternalServerError)
}
case _ => terminate(NotFound, Messages.propertyNotFound)
}
} else {
terminate(BadRequest, Messages.errorProcessingRequest)
}
}
case Success(Left(activationId)) =>
// blocking invoke which got queued instead
// this should not happen, instead it should be a blocking invoke timeout
logging.debug(this, "activation waiting period expired")
respondWithActivationIdHeader(activationId) {
terminate(Accepted, Messages.responseNotReady)
}
case Failure(t: RejectRequest) => terminate(t.code, t.message)
case Failure(t: LoadBalancerException) =>
logging.error(this, s"failed in loadbalancer: $t")
terminate(ServiceUnavailable)
case Failure(t) =>
logging.error(this, s"exception in completeRequest: $t")
terminate(InternalServerError)
}
}