in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala [195:228]
override def withRestartBackoffSettings(restartBackoff: RestartSettings): JdbcProjectionImpl[Offset, Envelope, S] =
copy(restartBackoffOpt = Some(restartBackoff))
/**
* Settings for AtLeastOnceSlickProjection
*/
override def withSaveOffset(
afterEnvelopes: Int,
afterDuration: FiniteDuration): JdbcProjectionImpl[Offset, Envelope, S] =
copy(offsetStrategy = offsetStrategy
.asInstanceOf[AtLeastOnce]
.copy(afterEnvelopes = Some(afterEnvelopes), orAfterDuration = Some(afterDuration)))
/**
* Settings for GroupedSlickProjection
*/
override def withGroup(
groupAfterEnvelopes: Int,
groupAfterDuration: FiniteDuration): JdbcProjectionImpl[Offset, Envelope, S] =
copy(handlerStrategy = handlerStrategy
.asInstanceOf[GroupedHandlerStrategy[Envelope]]
.copy(afterEnvelopes = Some(groupAfterEnvelopes), orAfterDuration = Some(groupAfterDuration)))
override def withRecoveryStrategy(
recoveryStrategy: HandlerRecoveryStrategy): JdbcProjectionImpl[Offset, Envelope, S] = {
val newStrategy = offsetStrategy match {
case s: ExactlyOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
case s: AtLeastOnce => s.copy(recoveryStrategy = Some(recoveryStrategy))
// NOTE: AtMostOnce has its own withRecoveryStrategy variant
// this method is not available for AtMostOnceProjection
case s: AtMostOnce => s
}
copy(offsetStrategy = newStrategy)
}