in cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java [382:414]
public void actorHandlerShouldStartStopActor() {
String entityId = UUID.randomUUID().toString();
ProjectionId projectionId = genRandomProjectionId();
TestProbe<Envelope> receiveProbe = testKit.createTestProbe();
TestProbe<Done> stopProbe = testKit.createTestProbe();
Projection<Envelope> projection =
CassandraProjection.atLeastOnce(
projectionId,
sourceProvider(entityId),
() ->
new TestActorHandler(
TestHandlerBehavior.create(receiveProbe.getRef(), stopProbe.getRef()),
testKit.system()))
.withSaveOffset(1, Duration.ZERO);
ActorRef<ProjectionBehavior.Command> projectionRef =
testKit.spawn(ProjectionBehavior.create(projection));
assertEquals("abc", receiveProbe.receiveMessage().message);
assertEquals("def", receiveProbe.receiveMessage().message);
assertEquals("ghi", receiveProbe.receiveMessage().message);
assertEquals("jkl", receiveProbe.receiveMessage().message);
assertEquals("mno", receiveProbe.receiveMessage().message);
assertEquals("pqr", receiveProbe.receiveMessage().message);
projectionRef.tell(ProjectionBehavior.stopMessage());
stopProbe.receiveMessage();
assertStoredOffset(projectionId, 6L);
}