stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java [99:140]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        writer.write(record).whenComplete(new FutureEventListener<DLSN>() {
            @Override
            public void onSuccess(DLSN value) {
                synchronized (syncPos) {
                    syncPos[0] = record.getTransactionId();
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                exceptionUpdater.compareAndSet(DLOutputStream.this, null, cause);
            }
        });
    }

    private CompletableFuture<DLSN> writeControlRecord() {
        LogRecord record;
        synchronized (this) {
            record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
            record.setControl();
        }
        return writer.write(record);
    }

    @Override
    public void flush() throws IOException {
        try {
            FutureUtils.result(writeControlRecord());
        } catch (IOException ioe) {
            throw ioe;
        } catch (Exception e) {
            log.error("Unexpected exception in DLOutputStream", e);
            throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e);
        }
    }

    @Override
    public void close() throws IOException {
        Utils.ioResult(
            writeControlRecord()
                .thenCompose(ignored -> writer.asyncClose())
                .thenCompose(ignored -> dlm.asyncClose()));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java [99:140]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        writer.write(record).whenComplete(new FutureEventListener<DLSN>() {
            @Override
            public void onSuccess(DLSN value) {
                synchronized (syncPos) {
                    syncPos[0] = record.getTransactionId();
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                exceptionUpdater.compareAndSet(DLOutputStream.this, null, cause);
            }
        });
    }

    private CompletableFuture<DLSN> writeControlRecord() {
        LogRecord record;
        synchronized (this) {
            record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
            record.setControl();
        }
        return writer.write(record);
    }

    @Override
    public void flush() throws IOException {
        try {
            FutureUtils.result(writeControlRecord());
        } catch (IOException ioe) {
            throw ioe;
        } catch (Exception e) {
            log.error("Unexpected exception in DLOutputStream", e);
            throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e);
        }
    }

    @Override
    public void close() throws IOException {
        Utils.ioResult(
            writeControlRecord()
                .thenCompose(ignored -> writer.asyncClose())
                .thenCompose(ignored -> dlm.asyncClose()));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



