public void atMostOnceShouldRestartFromNextOffset()

in cassandra/src/it/java/org/apache/pekko/projection/cassandra/CassandraProjectionTest.java [345:379]


  public void atMostOnceShouldRestartFromNextOffset() {
    String entityId = UUID.randomUUID().toString();
    ProjectionId projectionId = genRandomProjectionId();

    StringBuffer str = new StringBuffer();

    Projection<Envelope> projection =
        CassandraProjection.atMostOnce(
            projectionId, sourceProvider(entityId), () -> concatHandlerFail4(str));

    try {
      projectionTestKit.run(
          projection,
          () -> {
            assertEquals("abc|def|ghi|", str.toString());
          });
      Assert.fail("Expected exception");
    } catch (RuntimeException e) {
      assertEquals("fail on 4", e.getMessage());
    }

    assertStoredOffset(projectionId, 4L);

    // re-run projection without failing function
    Projection<Envelope> projection2 =
        CassandraProjection.atMostOnce(
            projectionId, sourceProvider(entityId), () -> concatHandler(str));

    projectionTestKit.run(
        projection2,
        () -> {
          // failed: jkl not included
          assertEquals("abc|def|ghi|mno|pqr|", str.toString());
        });
  }