in bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommand.java [98:187]
public static CompletableFuture<Boolean> handleAsync(ServerConfiguration conf, SanityFlags cmdFlags) {
CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.addConfiguration(conf);
clientConf.setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class);
clientConf.setAddEntryTimeout(cmdFlags.timeout);
clientConf.setReadEntryTimeout(cmdFlags.timeout);
BookKeeper bk;
try {
bk = new BookKeeper(clientConf);
} catch (BKException | IOException | InterruptedException e) {
LOG.warn("Failed to initialize bookkeeper client", e);
result.completeExceptionally(e);
return result;
}
bk.asyncCreateLedger(1, 1, BookKeeper.DigestType.MAC, new byte[0], (rc, lh, ctx) -> {
if (rc != BKException.Code.OK) {
LOG.warn("ledger creation failed for sanity command {}", rc);
result.completeExceptionally(BKException.create(rc));
return;
}
List<CompletableFuture<Void>> entriesFutures = new ArrayList<>();
for (int i = 0; i < cmdFlags.entries; i++) {
String content = "entry-" + i;
CompletableFuture<Void> entryFuture = new CompletableFuture<>();
entriesFutures.add(entryFuture);
lh.asyncAddEntry(content.getBytes(UTF_8), (arc, alh, entryId, actx) -> {
if (arc != BKException.Code.OK) {
LOG.warn("ledger add entry failed for {}-{}", alh.getId(), arc);
entryFuture.completeExceptionally(BKException.create(arc));
return;
}
entryFuture.complete(null);
}, null);
}
CompletableFuture<LedgerHandle> lhFuture = new CompletableFuture<>();
CompletableFuture<Void> readEntryFuture = new CompletableFuture<>();
FutureUtils.collect(entriesFutures).thenCompose(_r -> lh.closeAsync()).thenCompose(_r -> {
bk.asyncOpenLedger(lh.getId(), BookKeeper.DigestType.MAC, new byte[0], (orc, olh, octx) -> {
if (orc != BKException.Code.OK) {
LOG.warn("open sanity ledger failed for {}-{}", lh.getId(), orc);
lhFuture.completeExceptionally(BKException.create(orc));
return;
}
long lac = olh.getLastAddConfirmed();
if (lac != (cmdFlags.entries - 1)) {
lhFuture.completeExceptionally(new Exception("Invalid last entry found on ledger. expecting: "
+ (cmdFlags.entries - 1) + " -- found: " + lac));
return;
}
lhFuture.complete(lh);
}, null);
return lhFuture;
}).thenCompose(rlh -> {
rlh.asyncReadEntries(0, cmdFlags.entries - 1, (rrc, rlh2, entries, rctx) -> {
if (rrc != BKException.Code.OK) {
LOG.warn("reading sanity ledger failed for {}-{}", lh.getId(), rrc);
readEntryFuture.completeExceptionally(BKException.create(rrc));
return;
}
int i = 0;
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
String actualMsg = new String(entry.getEntry(), UTF_8);
String expectedMsg = "entry-" + (i++);
if (!expectedMsg.equals(actualMsg)) {
readEntryFuture.completeExceptionally(
new Exception("Failed validation of received message - Expected: " + expectedMsg
+ ", Actual: " + actualMsg));
return;
}
}
LOG.info("Read {} entries from ledger {}", i, lh.getId());
LOG.info("Bookie sanity test succeeded");
readEntryFuture.complete(null);
}, null);
return readEntryFuture;
}).thenAccept(_r -> {
close(bk, lh);
result.complete(true);
}).exceptionally(ex -> {
close(bk, lh);
result.completeExceptionally(ex.getCause());
return null;
});
}, null);
return result;
}