in jdbc/src/main/scala/org/apache/pekko/projection/jdbc/javadsl/JdbcProjection.scala [174:203]
def groupedWithin[Offset, Envelope, S <: JdbcSession](
projectionId: ProjectionId,
sourceProvider: SourceProvider[Offset, Envelope],
sessionCreator: Supplier[S],
handler: Supplier[JdbcHandler[java.util.List[Envelope], S]],
system: ActorSystem[_]): GroupedProjection[Offset, Envelope] = {
val sessionFactory = () => sessionCreator.get()
val javaSourceProvider = new SourceProviderAdapter(sourceProvider)
val offsetStore = JdbcProjectionImpl.createOffsetStore(sessionFactory)(system)
val adaptedHandler =
JdbcProjectionImpl.adaptedHandlerForGrouped(
projectionId,
javaSourceProvider,
sessionFactory,
() => new GroupedJdbcHandlerAdapter(handler.get()),
offsetStore)
new JdbcProjectionImpl(
projectionId,
javaSourceProvider,
sessionFactory = sessionFactory,
settingsOpt = None,
restartBackoffOpt = None,
ExactlyOnce(),
GroupedHandlerStrategy(adaptedHandler),
NoopStatusObserver,
offsetStore)
}