in testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestSourceProviderImpl.scala [46:87]
private def copy(
sourceEvents: Source[Envelope, NotUsed] = sourceEvents,
extractOffsetFn: Envelope => Offset = extractOffsetFn,
extractCreationTimeFn: Envelope => Long = extractCreationTimeFn,
verifyOffsetFn: Offset => OffsetVerification = verifyOffsetFn,
startSourceFromFn: (Offset, Offset) => Boolean = startSourceFromFn,
allowCompletion: Boolean = allowCompletion): TestSourceProviderImpl[Offset, Envelope] =
new TestSourceProviderImpl(
sourceEvents,
extractOffsetFn,
extractCreationTimeFn,
verifyOffsetFn,
startSourceFromFn,
allowCompletion)
def withExtractCreationTimeFunction(
extractCreationTimeFn: Envelope => Long): TestSourceProviderImpl[Offset, Envelope] =
copy(extractCreationTimeFn = extractCreationTimeFn)
def withExtractCreationTimeFunction(
extractCreationTimeFn: java.util.function.Function[Envelope, Long]): TestSourceProviderImpl[Offset, Envelope] =
withExtractCreationTimeFunction(extractCreationTimeFn.asScala)
def withAllowCompletion(allowCompletion: Boolean): TestSourceProviderImpl[Offset, Envelope] =
copy(allowCompletion = allowCompletion)
def withOffsetVerification(verifyOffsetFn: Offset => OffsetVerification): TestSourceProviderImpl[Offset, Envelope] =
copy(verifyOffsetFn = verifyOffsetFn)
override def withOffsetVerification(offsetVerificationFn: java.util.function.Function[Offset, OffsetVerification])
: TestSourceProviderImpl[Offset, Envelope] =
withOffsetVerification(offsetVerificationFn.asScala)
def withStartSourceFrom(startSourceFromFn: (Offset, Offset) => Boolean): TestSourceProviderImpl[Offset, Envelope] =
copy(startSourceFromFn = startSourceFromFn)
def withStartSourceFrom(startSourceFromFn: java.util.function.BiFunction[Offset, Offset, java.lang.Boolean])
: TestSourceProviderImpl[Offset, Envelope] = {
val adapted: (Offset, Offset) => Boolean = (lastOffset, offset) =>
Boolean.box(startSourceFromFn.apply(lastOffset, offset))
withStartSourceFrom(adapted)
}