private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched()

in stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java [878:1010]


    private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
            final String inprogressZnodeName,
            long logSegmentSeqNo,
            long logSegmentId,
            long firstTxId,
            long lastTxId,
            int recordCount,
            long lastEntryId,
            long lastSlotId,
            final CompletableFuture<LogSegmentMetadata> promise) {
        try {
            lock.checkOwnershipAndReacquire();
        } catch (IOException ioe) {
            FutureUtils.completeExceptionally(promise, ioe);
            return;
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
        }
        LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);

        // validate log segment
        if (inprogressLogSegment.getLogSegmentId() != logSegmentId) {
            FutureUtils.completeExceptionally(promise, new IOException(
                "Active ledger has different ID to inprogress. "
                    + inprogressLogSegment.getLogSegmentId() + " found, "
                    + logSegmentId + " expected"));
            return;
        }
        // validate the transaction id
        if (inprogressLogSegment.getFirstTxId() != firstTxId) {
            FutureUtils.completeExceptionally(promise, new IOException("Transaction id not as expected, "
                + inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected"));
            return;
        }
        // validate the log sequence number
        if (validateLogSegmentSequenceNumber) {
            synchronized (inprogressLSSNs) {
                if (inprogressLSSNs.isEmpty()) {
                    FutureUtils.completeExceptionally(promise, new UnexpectedException(
                            "Didn't find matched inprogress log segments when completing inprogress "
                                    + inprogressLogSegment));
                    return;
                }
                long leastInprogressLSSN = inprogressLSSNs.getFirst();
                // the log segment sequence number in metadata
                // {@link inprogressLogSegment.getLogSegmentSequenceNumber()}
                // should be same as the sequence number we are completing (logSegmentSeqNo)
                // and
                // it should also be same as the least inprogress log segment sequence number
                // tracked in {@link inprogressLSSNs}
                if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo)
                        || (leastInprogressLSSN != logSegmentSeqNo)) {
                    FutureUtils.completeExceptionally(promise, new UnexpectedException(
                            "Didn't find matched inprogress log segments when completing inprogress "
                                    + inprogressLogSegment));
                    return;
                }
            }
        }

        // store max sequence number.
        long maxSeqNo = Math.max(logSegmentSeqNo, maxLogSegmentSequenceNo.getSequenceNumber());
        if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo
                || (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo + 1)) {
            // ignore the case that a new inprogress log segment is pre-allocated
            // before completing current inprogress one
            LOG.info("Try storing max sequence number {} in completing {}.",
                    new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() });
        } else {
            LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
                maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName());
            if (validateLogSegmentSequenceNumber) {
                FutureUtils.completeExceptionally(promise,
                        new DLIllegalStateException("Unexpected max log segment sequence number "
                        + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
                        + ", expected " + (logSegmentSeqNo - 1)));
                return;
            }
        }

        // Prepare the completion
        final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
        long startSequenceId;
        try {
            startSequenceId = computeStartSequenceId(inprogressLogSegment);
        } catch (IOException ioe) {
            FutureUtils.completeExceptionally(promise, ioe);
            return;
        }
        // write completed ledger znode
        final LogSegmentMetadata completedLogSegment =
                inprogressLogSegment.completeLogSegment(
                        pathForCompletedLedger,
                        lastTxId,
                        recordCount,
                        lastEntryId,
                        lastSlotId,
                        startSequenceId);
        setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());

        // prepare the transaction
        Transaction<Object> txn = streamMetadataStore.newTransaction();

        // create completed log segment
        writeLogSegment(txn, completedLogSegment);
        // delete inprogress log segment
        deleteLogSegment(txn, inprogressLogSegment);
        // store max sequence number
        storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
        // update max txn id.
        if (LOG.isDebugEnabled()) {
            LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}",
                    pathForCompletedLedger, lastTxId);
        }
        storeMaxTxId(txn, maxTxId, lastTxId);

        txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                LOG.info("Completed {} to {} for {} : {}",
                    inprogressZnodeName, completedLogSegment.getSegmentName(),
                    getFullyQualifiedName(), completedLogSegment);
                FutureUtils.complete(promise, completedLogSegment);
            }

            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(promise, cause);
            }
        }, scheduler);
    }