in cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java [262:297]
public void atLeastOnceShouldRestartFromPreviousOffset() {
String entityId = UUID.randomUUID().toString();
ProjectionId projectionId = genRandomProjectionId();
StringBuffer str = new StringBuffer();
Projection<Envelope> projection =
CassandraProjection.atLeastOnce(
projectionId, sourceProvider(entityId), () -> concatHandlerFail4(str))
.withSaveOffset(1, Duration.ZERO);
try {
projectionTestKit.run(
projection,
() -> {
assertEquals("abc|def|ghi|", str.toString());
});
Assert.fail("Expected exception");
} catch (RuntimeException e) {
assertEquals("fail on 4", e.getMessage());
}
assertStoredOffset(projectionId, 3L);
// re-run projection without failing function
Projection<Envelope> projection2 =
CassandraProjection.atLeastOnce(
projectionId, sourceProvider(entityId), () -> concatHandler(str))
.withSaveOffset(1, Duration.ZERO);
projectionTestKit.run(
projection2,
() -> {
assertEquals("abc|def|ghi|jkl|mno|pqr|", str.toString());
});
}