public void setAllShardsToBeginning()

in src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java [58:97]


  public void setAllShardsToBeginning(String streamName) {
    String leaseTable = cosumerLeaseName(configuration.getApplicationName(), streamName);

    try {
      for (String shard : getAllShards(leaseTable)) {
        logger.atInfo().log("[%s - %s] Resetting checkpoint", leaseTable, shard);

        Map<String, AttributeValue> updateKey = new HashMap<>();
        updateKey.put(LEASE_KEY_ATTRIBUTE_NAME, AttributeValue.builder().s(shard).build());

        Map<String, AttributeValueUpdate> updateValues = new HashMap<>();
        updateValues.put(
            LEASE_CHECKPOINT_ATTRIBUTE_VAUE,
            AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s(TRIM_HORIZON.name().toUpperCase()).build())
                .build());

        UpdateItemResponse updateItemResponse =
            dynamoDbAsyncClient
                .updateItem(
                    UpdateItemRequest.builder()
                        .tableName(leaseTable)
                        .key(updateKey)
                        .attributeUpdates(updateValues)
                        .returnValues(ReturnValue.ALL_OLD)
                        .build())
                .get(DYNAMODB_RESPONSE_TIMEOUT_SECS, TimeUnit.SECONDS);

        logger.atInfo().log(
            "[%s - %s] Successfully reset checkpoints. old value: %s",
            leaseTable, shard, updateItemResponse);
      }
    } catch (InterruptedException e) {
      logger.atWarning().log("%s resetOffset: interrupted", leaseTable);
    } catch (ExecutionException e) {
      logger.atSevere().withCause(e).log("%s resetOffset: Error", leaseTable);
    } catch (TimeoutException e) {
      logger.atSevere().withCause(e).log("%s resetOffset: Timeout", leaseTable);
    }
  }