in http-testkit/src/main/scala/org/apache/pekko/http/scaladsl/testkit/RouteTestResultComponent.scala [30:122]
def failTest(msg: String): Nothing
/**
* A receptacle for the response or rejections created by a route.
*/
class RouteTestResult(timeout: FiniteDuration)(implicit fm: Materializer) {
private[this] var result: Option[Either[immutable.Seq[Rejection], HttpResponse]] = None
private[this] val latch = new CountDownLatch(1)
def handled: Boolean = synchronized { result.isDefined && result.get.isRight }
def rejections: immutable.Seq[Rejection] = synchronized {
result match {
case Some(Left(rejections)) => rejections
case Some(Right(response)) => failTest("Request was not rejected, response was " + response)
case None => failNeitherCompletedNorRejected()
}
}
def response: HttpResponse = rawResponse.withEntity(entity)
/** Returns a "fresh" entity with a "fresh" unconsumed byte- or chunk stream (if not strict) */
def entity: ResponseEntity = entityRecreator()
def chunks: immutable.Seq[ChunkStreamPart] =
entity match {
case HttpEntity.Chunked(_, chunks) => awaitAllElements[ChunkStreamPart](chunks)
case _ => Nil
}
def chunksStream: Source[ChunkStreamPart, Any] =
rawResponse.entity match {
case HttpEntity.Chunked(_, data) => data
case _ => Source.empty
}
def ~>[T](f: RouteTestResult => T): T = f(this)
private[testkit] def rawResponse: HttpResponse = synchronized {
result match {
case Some(Right(response)) => response
case Some(Left(Nil)) => failTest("Request was rejected")
case Some(Left(rejection :: Nil)) => failTest("Request was rejected with rejection " + rejection)
case Some(Left(rejections)) => failTest("Request was rejected with rejections " + rejections)
case None => failNeitherCompletedNorRejected()
}
}
private[testkit] def handleResult(rr: RouteResult): Unit =
synchronized {
if (result.isEmpty) {
result = rr match {
case RouteResult.Complete(response) => Some(Right(response))
case RouteResult.Rejected(rejections) => Some(Left(RejectionHandler.applyTransformations(rejections)))
}
latch.countDown()
} else failTest("Route completed/rejected more than once")
}
private[testkit] def handleResponse(r: HttpResponse): Unit =
synchronized {
if (result.isEmpty) {
result = Some(Right(r))
latch.countDown()
} else failTest("Route completed/rejected more than once")
}
private[testkit] def awaitResult: this.type = scala.concurrent.blocking {
latch.await(timeout.toMillis, MILLISECONDS)
this
}
private[this] lazy val entityRecreator: () => ResponseEntity =
rawResponse.entity match {
case s: HttpEntity.Strict => () => s
case HttpEntity.Default(contentType, contentLength, data) =>
val dataChunks = awaitAllElements(data);
{ () => HttpEntity.Default(contentType, contentLength, Source(dataChunks)) }
case HttpEntity.CloseDelimited(contentType, data) =>
val dataChunks = awaitAllElements(data); { () => HttpEntity.CloseDelimited(contentType, Source(dataChunks)) }
case HttpEntity.Chunked(contentType, data) =>
val dataChunks = awaitAllElements(data); { () => HttpEntity.Chunked(contentType, Source(dataChunks)) }
}
private def failNeitherCompletedNorRejected(): Nothing =
failTest("Request was neither completed nor rejected within " + timeout)
private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] =
data.limit(100000).runWith(Sink.seq).awaitResult(timeout)
}