in cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java [345:379]
public void atMostOnceShouldRestartFromNextOffset() {
String entityId = UUID.randomUUID().toString();
ProjectionId projectionId = genRandomProjectionId();
StringBuffer str = new StringBuffer();
Projection<Envelope> projection =
CassandraProjection.atMostOnce(
projectionId, sourceProvider(entityId), () -> concatHandlerFail4(str));
try {
projectionTestKit.run(
projection,
() -> {
assertEquals("abc|def|ghi|", str.toString());
});
Assert.fail("Expected exception");
} catch (RuntimeException e) {
assertEquals("fail on 4", e.getMessage());
}
assertStoredOffset(projectionId, 4L);
// re-run projection without failing function
Projection<Envelope> projection2 =
CassandraProjection.atMostOnce(
projectionId, sourceProvider(entityId), () -> concatHandler(str));
projectionTestKit.run(
projection2,
() -> {
// failed: jkl not included
assertEquals("abc|def|ghi|mno|pqr|", str.toString());
});
}