void batchRecoverLedgerFragmentEntry()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java [448:552]


    void batchRecoverLedgerFragmentEntry(final long startEntryId,
                                         final long endEntryId,
                                         final LedgerHandle lh,
                                         final AsyncCallback.VoidCallback ledgerFragmentMcb,
                                         final Set<BookieId> newBookies,
                                         final BiConsumer<Long, Long> onReadEntryFailureCallback)
            throws InterruptedException {
        int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
        int maxBytesToReplicate = conf.getReplicationRateByBytes();
        if (replicationThrottle != null) {
            if (maxBytesToReplicate != -1 && maxBytesToReplicate > averageEntrySize.get() * entriesToReplicateCnt) {
                maxBytesToReplicate = averageEntrySize.get() * entriesToReplicateCnt;
            }
            replicationThrottle.acquire(maxBytesToReplicate);
        }

        lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt, maxBytesToReplicate,
            new ReadCallback() {
                @Override
                public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                    if (rc != BKException.Code.OK) {
                        LOG.error("BK error reading ledger entries: {} - {}",
                                startEntryId, endEntryId, BKException.create(rc));
                        onReadEntryFailureCallback.accept(lh.getId(), startEntryId);
                        for (int i = 0; i < entriesToReplicateCnt; i++) {
                            ledgerFragmentMcb.processResult(rc, null, null);
                        }
                        return;
                    }
                    long lastEntryId = startEntryId;
                    while (seq.hasMoreElements()) {
                        LedgerEntry entry = seq.nextElement();
                        lastEntryId = entry.getEntryId();
                        byte[] data = entry.getEntry();
                        final long dataLength = data.length;
                        numEntriesRead.inc();
                        numBytesRead.registerSuccessfulValue(dataLength);

                        ReferenceCounted toSend = lh.getDigestManager()
                                .computeDigestAndPackageForSending(entry.getEntryId(),
                                        lh.getLastAddConfirmed(), entry.getLength(),
                                        Unpooled.wrappedBuffer(data, 0, data.length),
                                        lh.getLedgerKey(),
                                        BookieProtocol.FLAG_RECOVERY_ADD);
                        if (replicationThrottle != null) {
                            if (toSend instanceof ByteBuf) {
                                updateAverageEntrySize(((ByteBuf) toSend).readableBytes());
                            } else if (toSend instanceof ByteBufList) {
                                updateAverageEntrySize(((ByteBufList) toSend).readableBytes());
                            }
                        }
                        AtomicInteger numCompleted = new AtomicInteger(0);
                        AtomicBoolean completed = new AtomicBoolean(false);

                        WriteCallback multiWriteCallback = new WriteCallback() {
                            @Override
                            public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
                                if (rc != BKException.Code.OK) {
                                    LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}",
                                            ledgerId, entryId, addr, BKException.create(rc));
                                    if (completed.compareAndSet(false, true)) {
                                        ledgerFragmentMcb.processResult(rc, null, null);
                                    }
                                } else {
                                    numEntriesWritten.inc();
                                    if (ctx instanceof Long) {
                                        numBytesWritten.registerSuccessfulValue((Long) ctx);
                                    }
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!",
                                                ledgerId, entryId, addr);
                                    }
                                    if (numCompleted.incrementAndGet() == newBookies.size()
                                            && completed.compareAndSet(false, true)) {
                                        ledgerFragmentMcb.processResult(rc, null, null);
                                    }
                                }
                            }
                        };

                        for (BookieId newBookie : newBookies) {
                            long startWriteEntryTime = MathUtils.nowInNano();
                            bkc.getBookieClient().addEntry(newBookie, lh.getId(),
                                    lh.getLedgerKey(), entry.getEntryId(), toSend,
                                    multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD,
                                    false, WriteFlag.NONE);
                            writeDataLatency.registerSuccessfulEvent(
                                    MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
                        }
                        toSend.release();
                    }
                    if (lastEntryId != endEntryId) {
                        try {
                            batchRecoverLedgerFragmentEntry(lastEntryId + 1, endEntryId, lh,
                                    ledgerFragmentMcb, newBookies, onReadEntryFailureCallback);
                        } catch (InterruptedException e) {
                            int remainingEntries = (int) (endEntryId - lastEntryId);
                            for (int i = 0; i < remainingEntries; i++) {
                                ledgerFragmentMcb.processResult(BKException.Code.InterruptedException, null, null);
                            }
                        }
                    }
                }
            }, null);
    }