in src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java [58:97]
public void setAllShardsToBeginning(String streamName) {
String leaseTable = cosumerLeaseName(configuration.getApplicationName(), streamName);
try {
for (String shard : getAllShards(leaseTable)) {
logger.atInfo().log("[%s - %s] Resetting checkpoint", leaseTable, shard);
Map<String, AttributeValue> updateKey = new HashMap<>();
updateKey.put(LEASE_KEY_ATTRIBUTE_NAME, AttributeValue.builder().s(shard).build());
Map<String, AttributeValueUpdate> updateValues = new HashMap<>();
updateValues.put(
LEASE_CHECKPOINT_ATTRIBUTE_VAUE,
AttributeValueUpdate.builder()
.value(AttributeValue.builder().s(TRIM_HORIZON.name().toUpperCase()).build())
.build());
UpdateItemResponse updateItemResponse =
dynamoDbAsyncClient
.updateItem(
UpdateItemRequest.builder()
.tableName(leaseTable)
.key(updateKey)
.attributeUpdates(updateValues)
.returnValues(ReturnValue.ALL_OLD)
.build())
.get(DYNAMODB_RESPONSE_TIMEOUT_SECS, TimeUnit.SECONDS);
logger.atInfo().log(
"[%s - %s] Successfully reset checkpoints. old value: %s",
leaseTable, shard, updateItemResponse);
}
} catch (InterruptedException e) {
logger.atWarning().log("%s resetOffset: interrupted", leaseTable);
} catch (ExecutionException e) {
logger.atSevere().withCause(e).log("%s resetOffset: Error", leaseTable);
} catch (TimeoutException e) {
logger.atSevere().withCause(e).log("%s resetOffset: Timeout", leaseTable);
}
}