override def source()

in testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala [89:98]


  override def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = {
    implicit val ec = pekko.dispatch.ExecutionContexts.parasitic
    val src =
      if (allowCompletion) sourceEvents
      else sourceEvents.concat(Source.maybe)
    offset().map {
      case Some(o) => src.dropWhile(env => startSourceFromFn(o, extractOffset(env)))
      case _       => src
    }
  }