in bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java [448:552]
void batchRecoverLedgerFragmentEntry(final long startEntryId,
final long endEntryId,
final LedgerHandle lh,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
final Set<BookieId> newBookies,
final BiConsumer<Long, Long> onReadEntryFailureCallback)
throws InterruptedException {
int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1);
int maxBytesToReplicate = conf.getReplicationRateByBytes();
if (replicationThrottle != null) {
if (maxBytesToReplicate != -1 && maxBytesToReplicate > averageEntrySize.get() * entriesToReplicateCnt) {
maxBytesToReplicate = averageEntrySize.get() * entriesToReplicateCnt;
}
replicationThrottle.acquire(maxBytesToReplicate);
}
lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt, maxBytesToReplicate,
new ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
if (rc != BKException.Code.OK) {
LOG.error("BK error reading ledger entries: {} - {}",
startEntryId, endEntryId, BKException.create(rc));
onReadEntryFailureCallback.accept(lh.getId(), startEntryId);
for (int i = 0; i < entriesToReplicateCnt; i++) {
ledgerFragmentMcb.processResult(rc, null, null);
}
return;
}
long lastEntryId = startEntryId;
while (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
lastEntryId = entry.getEntryId();
byte[] data = entry.getEntry();
final long dataLength = data.length;
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);
ReferenceCounted toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entry.getEntryId(),
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length),
lh.getLedgerKey(),
BookieProtocol.FLAG_RECOVERY_ADD);
if (replicationThrottle != null) {
if (toSend instanceof ByteBuf) {
updateAverageEntrySize(((ByteBuf) toSend).readableBytes());
} else if (toSend instanceof ByteBufList) {
updateAverageEntrySize(((ByteBufList) toSend).readableBytes());
}
}
AtomicInteger numCompleted = new AtomicInteger(0);
AtomicBoolean completed = new AtomicBoolean(false);
WriteCallback multiWriteCallback = new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
if (rc != BKException.Code.OK) {
LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}",
ledgerId, entryId, addr, BKException.create(rc));
if (completed.compareAndSet(false, true)) {
ledgerFragmentMcb.processResult(rc, null, null);
}
} else {
numEntriesWritten.inc();
if (ctx instanceof Long) {
numBytesWritten.registerSuccessfulValue((Long) ctx);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!",
ledgerId, entryId, addr);
}
if (numCompleted.incrementAndGet() == newBookies.size()
&& completed.compareAndSet(false, true)) {
ledgerFragmentMcb.processResult(rc, null, null);
}
}
}
};
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entry.getEntryId(), toSend,
multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
writeDataLatency.registerSuccessfulEvent(
MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS);
}
toSend.release();
}
if (lastEntryId != endEntryId) {
try {
batchRecoverLedgerFragmentEntry(lastEntryId + 1, endEntryId, lh,
ledgerFragmentMcb, newBookies, onReadEntryFailureCallback);
} catch (InterruptedException e) {
int remainingEntries = (int) (endEntryId - lastEntryId);
for (int i = 0; i < remainingEntries; i++) {
ledgerFragmentMcb.processResult(BKException.Code.InterruptedException, null, null);
}
}
}
}
}, null);
}