in ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java [117:183]
public static void main(String[] args) throws Exception {
VerificationTool tool = new VerificationTool();
JCommander jc = JCommander.newBuilder()
.addObject(tool)
.build();
jc.parse(args);
if (tool.help) {
jc.usage();
return;
}
System.out.println(tool.metaQuorum);
try (LogServiceClient client = new LogServiceClient(tool.metaQuorum)) {
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> futures = new ArrayList<Future<?>>(tool.numLogs);
if (tool.write) {
LOG.info("Executing parallel writes");
// Delete any logs that already exist first
final Set<LogName> logsInSystem = new HashSet<>();
List<LogInfo> listOfLogs = client.listLogs();
for (LogInfo logInfo : listOfLogs) {
logsInSystem.add(logInfo.getLogName());
}
LOG.info("Observed logs already in system: {}", logsInSystem);
for (int i = 0; i < tool.numLogs; i++) {
LogName logName = getLogName(i);
if (logsInSystem.contains(logName)) {
LOG.info("Deleting {}", logName);
client.deleteLog(logName);
}
}
// First write batch entries to log.
if(tool.batchSize > 0) {
// Compute the number of batches to write given the batch size.
int numBatches = tool.numRecords / tool.batchSize;
for (int i = 0; i < tool.numLogs; i++) {
BatchWriter writer = new BatchWriter(getLogName(i), client, tool.numRecords,
tool.logFrequency, tool.recordSize, tool.batchSize, numBatches);
futures.add(executor.submit(writer));
}
} else {
// Write single entries to log.
for (int i = 0; i < tool.numLogs; i++) {
BulkWriter writer = new BulkWriter(getLogName(i), client, tool.numRecords,
tool.logFrequency, tool.recordSize);
futures.add(executor.submit(writer));
}
}
waitForCompletion(futures);
}
if (tool.read) {
LOG.info("Executing parallel reads");
futures = new ArrayList<Future<?>>(tool.numLogs);
for (int i = 0; i < tool.numLogs; i++) {
BulkReader reader = new BulkReader(getLogName(i), client, tool.numRecords, tool.logFrequency,
tool.recordSize);
futures.add(executor.submit(reader));
}
waitForCompletion(futures);
}
executor.shutdownNow();
}
}