in stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java [878:1010]
private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
final String inprogressZnodeName,
long logSegmentSeqNo,
long logSegmentId,
long firstTxId,
long lastTxId,
int recordCount,
long lastEntryId,
long lastSlotId,
final CompletableFuture<LogSegmentMetadata> promise) {
try {
lock.checkOwnershipAndReacquire();
} catch (IOException ioe) {
FutureUtils.completeExceptionally(promise, ioe);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
}
LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);
// validate log segment
if (inprogressLogSegment.getLogSegmentId() != logSegmentId) {
FutureUtils.completeExceptionally(promise, new IOException(
"Active ledger has different ID to inprogress. "
+ inprogressLogSegment.getLogSegmentId() + " found, "
+ logSegmentId + " expected"));
return;
}
// validate the transaction id
if (inprogressLogSegment.getFirstTxId() != firstTxId) {
FutureUtils.completeExceptionally(promise, new IOException("Transaction id not as expected, "
+ inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected"));
return;
}
// validate the log sequence number
if (validateLogSegmentSequenceNumber) {
synchronized (inprogressLSSNs) {
if (inprogressLSSNs.isEmpty()) {
FutureUtils.completeExceptionally(promise, new UnexpectedException(
"Didn't find matched inprogress log segments when completing inprogress "
+ inprogressLogSegment));
return;
}
long leastInprogressLSSN = inprogressLSSNs.getFirst();
// the log segment sequence number in metadata
// {@link inprogressLogSegment.getLogSegmentSequenceNumber()}
// should be same as the sequence number we are completing (logSegmentSeqNo)
// and
// it should also be same as the least inprogress log segment sequence number
// tracked in {@link inprogressLSSNs}
if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo)
|| (leastInprogressLSSN != logSegmentSeqNo)) {
FutureUtils.completeExceptionally(promise, new UnexpectedException(
"Didn't find matched inprogress log segments when completing inprogress "
+ inprogressLogSegment));
return;
}
}
}
// store max sequence number.
long maxSeqNo = Math.max(logSegmentSeqNo, maxLogSegmentSequenceNo.getSequenceNumber());
if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo
|| (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo + 1)) {
// ignore the case that a new inprogress log segment is pre-allocated
// before completing current inprogress one
LOG.info("Try storing max sequence number {} in completing {}.",
new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() });
} else {
LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName());
if (validateLogSegmentSequenceNumber) {
FutureUtils.completeExceptionally(promise,
new DLIllegalStateException("Unexpected max log segment sequence number "
+ maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
+ ", expected " + (logSegmentSeqNo - 1)));
return;
}
}
// Prepare the completion
final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
long startSequenceId;
try {
startSequenceId = computeStartSequenceId(inprogressLogSegment);
} catch (IOException ioe) {
FutureUtils.completeExceptionally(promise, ioe);
return;
}
// write completed ledger znode
final LogSegmentMetadata completedLogSegment =
inprogressLogSegment.completeLogSegment(
pathForCompletedLedger,
lastTxId,
recordCount,
lastEntryId,
lastSlotId,
startSequenceId);
setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());
// prepare the transaction
Transaction<Object> txn = streamMetadataStore.newTransaction();
// create completed log segment
writeLogSegment(txn, completedLogSegment);
// delete inprogress log segment
deleteLogSegment(txn, inprogressLogSegment);
// store max sequence number
storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
// update max txn id.
if (LOG.isDebugEnabled()) {
LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}",
pathForCompletedLedger, lastTxId);
}
storeMaxTxId(txn, maxTxId, lastTxId);
txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
LOG.info("Completed {} to {} for {} : {}",
inprogressZnodeName, completedLogSegment.getSegmentName(),
getFullyQualifiedName(), completedLogSegment);
FutureUtils.complete(promise, completedLogSegment);
}
@Override
public void onFailure(Throwable cause) {
FutureUtils.completeExceptionally(promise, cause);
}
}, scheduler);
}