in testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestProjectionImpl.scala [65:132]
private def copy(
projectionId: ProjectionId = projectionId,
sourceProvider: SourceProvider[Offset, Envelope] = sourceProvider,
handlerStrategy: HandlerStrategy = handlerStrategy,
offsetStrategy: OffsetStrategy = offsetStrategy,
statusObserver: StatusObserver[Envelope] = statusObserver,
offsetStoreFactory: () => TestOffsetStore[Offset] = offsetStoreFactory,
startOffset: Option[Offset] = startOffset): TestProjectionImpl[Offset, Envelope] =
new TestProjectionImpl(
projectionId,
sourceProvider,
handlerStrategy,
offsetStrategy,
statusObserver,
offsetStoreFactory,
startOffset)
override def withStatusObserver(observer: StatusObserver[Envelope]): TestProjectionImpl[Offset, Envelope] =
copy(statusObserver = observer)
def withStartOffset(offset: Offset): TestProjectionImpl[Offset, Envelope] = copy(startOffset = Some(offset))
def withOffsetStoreFactory(factory: () => TestOffsetStore[Offset]): TestProjectionImpl[Offset, Envelope] =
copy(offsetStoreFactory = factory)
override def withOffsetStoreFactory(factory: Supplier[pekko.projection.testkit.javadsl.TestOffsetStore[Offset]])
: javadsl.TestProjection[Offset, Envelope] =
withOffsetStoreFactory(() => new TestOffsetStoreAdapter(factory.get()))
@InternalApi
private[projection] def withOffsetStrategy(strategy: OffsetStrategy): TestProjectionImpl[Offset, Envelope] =
copy(offsetStrategy = strategy)
// FIXME: Should any of the following settings be exposed by the TestProjection?
final override def withRestartBackoffSettings(restartBackoff: RestartSettings): TestProjectionImpl[Offset, Envelope] =
this
override def withSaveOffset(
afterEnvelopes: Int,
afterDuration: FiniteDuration): TestProjectionImpl[Offset, Envelope] =
this
override def withGroup(
groupAfterEnvelopes: Int,
groupAfterDuration: FiniteDuration): TestProjectionImpl[Offset, Envelope] = this
/**
* INTERNAL API
*/
@InternalApi
private[projection] def actorHandlerInit[T]: Option[ActorHandlerInit[T]] = handlerStrategy.actorHandlerInit
/**
* INTERNAL API: To control the [[pekko.projection.internal.InternalProjectionState]] used in the projection.
*/
@InternalApi
private[projection] def newState(implicit system: ActorSystem[_]): TestInternalProjectionState[Offset, Envelope] =
new TestInternalProjectionState(
projectionId,
sourceProvider,
handlerStrategy,
offsetStrategy,
statusObserver,
offsetStoreFactory(),
startOffset)
private def state(implicit system: ActorSystem[_]): TestInternalProjectionState[Offset, Envelope] = {
if (_state.isEmpty) _state = Some(newState)
_state.get
}