in testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala [89:98]
override def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = {
implicit val ec = pekko.dispatch.ExecutionContexts.parasitic
val src =
if (allowCompletion) sourceEvents
else sourceEvents.concat(Source.maybe)
offset().map {
case Some(o) => src.dropWhile(env => startSourceFromFn(o, extractOffset(env)))
case _ => src
}
}