public static CompletableFuture handleAsync()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/SanityTestCommand.java [98:187]


    public static CompletableFuture<Boolean> handleAsync(ServerConfiguration conf, SanityFlags cmdFlags) {
        CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
        ClientConfiguration clientConf = new ClientConfiguration();
        clientConf.addConfiguration(conf);
        clientConf.setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class);
        clientConf.setAddEntryTimeout(cmdFlags.timeout);
        clientConf.setReadEntryTimeout(cmdFlags.timeout);

        BookKeeper bk;
        try {
            bk = new BookKeeper(clientConf);
        } catch (BKException | IOException | InterruptedException e) {
            LOG.warn("Failed to initialize bookkeeper client", e);
            result.completeExceptionally(e);
            return result;
        }

        bk.asyncCreateLedger(1, 1, BookKeeper.DigestType.MAC, new byte[0], (rc, lh, ctx) -> {
            if (rc != BKException.Code.OK) {
                LOG.warn("ledger creation failed for sanity command {}", rc);
                result.completeExceptionally(BKException.create(rc));
                return;
            }
            List<CompletableFuture<Void>> entriesFutures = new ArrayList<>();
            for (int i = 0; i < cmdFlags.entries; i++) {
                String content = "entry-" + i;
                CompletableFuture<Void> entryFuture = new CompletableFuture<>();
                entriesFutures.add(entryFuture);
                lh.asyncAddEntry(content.getBytes(UTF_8), (arc, alh, entryId, actx) -> {
                    if (arc != BKException.Code.OK) {
                        LOG.warn("ledger add entry failed for {}-{}", alh.getId(), arc);
                        entryFuture.completeExceptionally(BKException.create(arc));
                        return;
                    }
                    entryFuture.complete(null);
                }, null);
            }
            CompletableFuture<LedgerHandle> lhFuture = new CompletableFuture<>();
            CompletableFuture<Void> readEntryFuture = new CompletableFuture<>();
            FutureUtils.collect(entriesFutures).thenCompose(_r -> lh.closeAsync()).thenCompose(_r -> {
                bk.asyncOpenLedger(lh.getId(), BookKeeper.DigestType.MAC, new byte[0], (orc, olh, octx) -> {
                    if (orc != BKException.Code.OK) {
                        LOG.warn("open sanity ledger failed for {}-{}", lh.getId(), orc);
                        lhFuture.completeExceptionally(BKException.create(orc));
                        return;
                    }
                    long lac = olh.getLastAddConfirmed();
                    if (lac != (cmdFlags.entries - 1)) {
                        lhFuture.completeExceptionally(new Exception("Invalid last entry found on ledger. expecting: "
                                + (cmdFlags.entries - 1) + " -- found: " + lac));
                        return;
                    }
                    lhFuture.complete(lh);
                }, null);
                return lhFuture;
            }).thenCompose(rlh -> {
                rlh.asyncReadEntries(0, cmdFlags.entries - 1, (rrc, rlh2, entries, rctx) -> {
                    if (rrc != BKException.Code.OK) {
                        LOG.warn("reading sanity ledger failed for {}-{}", lh.getId(), rrc);
                        readEntryFuture.completeExceptionally(BKException.create(rrc));
                        return;
                    }
                    int i = 0;
                    while (entries.hasMoreElements()) {
                        LedgerEntry entry = entries.nextElement();
                        String actualMsg = new String(entry.getEntry(), UTF_8);
                        String expectedMsg = "entry-" + (i++);
                        if (!expectedMsg.equals(actualMsg)) {
                            readEntryFuture.completeExceptionally(
                                    new Exception("Failed validation of received message - Expected: " + expectedMsg
                                            + ", Actual: " + actualMsg));
                            return;
                        }
                    }
                    LOG.info("Read {} entries from ledger {}", i, lh.getId());
                    LOG.info("Bookie sanity test succeeded");
                    readEntryFuture.complete(null);
                }, null);
                return readEntryFuture;
            }).thenAccept(_r -> {
                close(bk, lh);
                result.complete(true);
            }).exceptionally(ex -> {
                close(bk, lh);
                result.completeExceptionally(ex.getCause());
                return null;
            });
        }, null);
        return result;
    }