stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java [42:97]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Slf4j
class DLOutputStream extends OutputStream {

    private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);

    private final DistributedLogManager dlm;
    private final AsyncLogWriter writer;

    // positions
    private final long[] syncPos = new long[1];
    private long writePos = 0L;

    // state
    private static final AtomicReferenceFieldUpdater<DLOutputStream, Throwable> exceptionUpdater =
        AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, Throwable.class, "exception");
    private volatile Throwable exception = null;

    DLOutputStream(DistributedLogManager dlm,
                   AsyncLogWriter writer) {
        this.dlm = dlm;
        this.writer = writer;
        this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId();
        this.syncPos[0] = writePos;
    }

    public synchronized long position() {
        return syncPos[0];
    }

    @Override
    public void write(int b) throws IOException {
        byte[] data = new byte[] { (byte) b };
        write(data);
    }

    @Override
    public void write(byte[] b) throws IOException {
        write(Unpooled.wrappedBuffer(b));
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        write(Unpooled.wrappedBuffer(b, off, len));
    }

    private synchronized void write(ByteBuf buf) throws IOException {
        Throwable cause = exceptionUpdater.get(this);
        if (null != cause) {
            if (cause instanceof IOException) {
                throw (IOException) cause;
            } else {
                throw new UnexpectedException("Encountered unknown issue", cause);
            }
        }

        writePos += buf.readableBytes();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java [42:97]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Slf4j
class DLOutputStream extends OutputStream {

    private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);

    private final DistributedLogManager dlm;
    private final AsyncLogWriter writer;

    // positions
    private final long[] syncPos = new long[1];
    private long writePos = 0L;

    // state
    private static final AtomicReferenceFieldUpdater<DLOutputStream, Throwable> exceptionUpdater =
        AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, Throwable.class, "exception");
    private volatile Throwable exception = null;

    DLOutputStream(DistributedLogManager dlm,
                   AsyncLogWriter writer) {
        this.dlm = dlm;
        this.writer = writer;
        this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId();
        this.syncPos[0] = writePos;
    }

    public synchronized long position() {
        return syncPos[0];
    }

    @Override
    public void write(int b) throws IOException {
        byte[] data = new byte[] { (byte) b };
        write(data);
    }

    @Override
    public void write(byte[] b) throws IOException {
        write(Unpooled.wrappedBuffer(b));
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        write(Unpooled.wrappedBuffer(b, off, len));
    }

    private synchronized void write(ByteBuf buf) throws IOException {
        Throwable cause = exceptionUpdater.get(this);
        if (null != cause) {
            if (cause instanceof IOException) {
                throw (IOException) cause;
            } else {
                throw new UnexpectedException("Encountered unknown issue", cause);
            }
        }

        writePos += buf.readableBytes();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



