public void atLeastOnceShouldRestartFromPreviousOffset()

in cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java [262:297]


  public void atLeastOnceShouldRestartFromPreviousOffset() {
    String entityId = UUID.randomUUID().toString();
    ProjectionId projectionId = genRandomProjectionId();

    StringBuffer str = new StringBuffer();

    Projection<Envelope> projection =
        CassandraProjection.atLeastOnce(
                projectionId, sourceProvider(entityId), () -> concatHandlerFail4(str))
            .withSaveOffset(1, Duration.ZERO);

    try {
      projectionTestKit.run(
          projection,
          () -> {
            assertEquals("abc|def|ghi|", str.toString());
          });
      Assert.fail("Expected exception");
    } catch (RuntimeException e) {
      assertEquals("fail on 4", e.getMessage());
    }

    assertStoredOffset(projectionId, 3L);

    // re-run projection without failing function
    Projection<Envelope> projection2 =
        CassandraProjection.atLeastOnce(
                projectionId, sourceProvider(entityId), () -> concatHandler(str))
            .withSaveOffset(1, Duration.ZERO);

    projectionTestKit.run(
        projection2,
        () -> {
          assertEquals("abc|def|ghi|jkl|mno|pqr|", str.toString());
        });
  }