protected BKLogSegmentWriter()

in stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java [257:397]


    protected BKLogSegmentWriter(String streamName,
                                 String logSegmentName,
                                 DistributedLogConfiguration conf,
                                 int logSegmentMetadataVersion,
                                 LogSegmentEntryWriter entryWriter,
                                 DistributedLock lock, /** the lock needs to be acquired **/
                                 long startTxId,
                                 long logSegmentSequenceNumber,
                                 OrderedScheduler scheduler,
                                 StatsLogger statsLogger,
                                 StatsLogger perLogStatsLogger,
                                 AlertStatsLogger alertStatsLogger,
                                 PermitLimiter globalWriteLimiter,
                                 FeatureProvider featureProvider,
                                 DynamicDistributedLogConfiguration dynConf)
        throws IOException {
        super();

        // set up a write limiter
        PermitLimiter streamWriteLimiter = null;
        if (conf.getPerWriterOutstandingWriteLimit() < 0) {
            streamWriteLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
        } else {
            Feature disableWriteLimitFeature = featureProvider.getFeature(
                CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
            streamWriteLimiter = new SimplePermitLimiter(
                conf.getOutstandingWriteLimitDarkmode(),
                conf.getPerWriterOutstandingWriteLimit(),
                statsLogger.scope("streamWriteLimiter"),
                false,
                disableWriteLimitFeature);
        }
        this.writeLimiter = new WriteLimiter(streamName, streamWriteLimiter, globalWriteLimiter);
        this.alertStatsLogger = alertStatsLogger;

        StatsLogger flushStatsLogger = statsLogger.scope("flush");
        StatsLogger pFlushStatsLogger = flushStatsLogger.scope("periodic");
        pFlushSuccesses = pFlushStatsLogger.getCounter("success");
        pFlushMisses = pFlushStatsLogger.getCounter("miss");

        // transmit
        StatsLogger transmitDataStatsLogger = statsLogger.scope("data");
        transmitDataSuccesses = transmitDataStatsLogger.getCounter("success");
        transmitDataMisses = transmitDataStatsLogger.getCounter("miss");
        StatsLogger transmitStatsLogger = statsLogger.scope("transmit");
        transmitDataPacketSize =  transmitStatsLogger.getOpStatsLogger("packetsize");
        StatsLogger transmitControlStatsLogger = statsLogger.scope("control");
        transmitControlSuccesses = transmitControlStatsLogger.getCounter("success");
        StatsLogger segWriterStatsLogger = statsLogger.scope("seg_writer");
        writeTime = segWriterStatsLogger.getOpStatsLogger("write");
        addCompleteTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("callback");
        addCompleteQueuedTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("queued");
        addCompleteDeferredTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("deferred");
        pendingWrites = segWriterStatsLogger.getCounter("pending");

        // outstanding transmit requests
        transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
        transmitOutstandingGauge = new Gauge<Number>() {
            @Override
            public Number getDefaultValue() {
                return 0;
            }
            @Override
            public Number getSample() {
                return outstandingTransmitsUpdater.get(BKLogSegmentWriter.this);
            }
        };
        transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge);

        this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
        this.streamName = streamName;
        this.logSegmentMetadataVersion = logSegmentMetadataVersion;
        this.entryWriter = entryWriter;
        this.lock = lock;
        this.lock.checkOwnershipAndReacquire();

        final int configuredTransmissionThreshold = dynConf.getOutputBufferSize();
        if (configuredTransmissionThreshold > MAX_LOGRECORDSET_SIZE) {
            LOG.warn("Setting output buffer size {} greater than max transmission size {} for log segment {}",
                configuredTransmissionThreshold, MAX_LOGRECORDSET_SIZE, fullyQualifiedLogSegment);
            this.transmissionThreshold = MAX_LOGRECORDSET_SIZE;
        } else {
            this.transmissionThreshold = configuredTransmissionThreshold;
        }
        this.compressionType = CompressionUtils.stringToType(conf.getCompressionType());

        this.logSegmentSequenceNumber = logSegmentSequenceNumber;
        this.recordSetWriter = Entry.newEntry(
                streamName,
                Math.max(transmissionThreshold, 1024),
                envelopeBeforeTransmit(),
                compressionType);
        this.packetPrevious = null;
        this.startTxId = startTxId;
        this.lastTxId = startTxId;
        this.lastTxIdAcknowledged = startTxId;
        this.enableRecordCounts = conf.getEnableRecordCounts();
        this.immediateFlushEnabled = conf.getImmediateFlushEnabled();
        this.isDurableWriteEnabled = dynConf.isDurableWriteEnabled();
        this.scheduler = scheduler;

        // Failure injection
        if (conf.getEIInjectWriteDelay()) {
            this.writeDelayInjector = new RandomDelayFailureInjector(dynConf);
        } else {
            this.writeDelayInjector = FailureInjector.NULL;
        }

        // If we are transmitting immediately (threshold == 0) and if immediate
        // flush is enabled, we don't need the periodic flush task
        final int configuredPeriodicFlushFrequency = dynConf.getPeriodicFlushFrequencyMilliSeconds();
        if (!immediateFlushEnabled || (0 != this.transmissionThreshold)) {
            int periodicFlushFrequency = configuredPeriodicFlushFrequency;
            if (periodicFlushFrequency > 0 && scheduler != null) {
                periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
                        periodicFlushFrequency / 2, periodicFlushFrequency / 2, TimeUnit.MILLISECONDS);
            } else {
                periodicFlushSchedule = null;
            }
        } else {
            // Min delay heuristic applies only when immediate flush is enabled
            // and transmission threshold is zero
            minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs();
            periodicFlushSchedule = null;
        }
        this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
        if (periodicKeepAliveMs > 0 && scheduler != null) {
            periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    keepAlive();
                }
            }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS);
        } else {
            periodicKeepAliveSchedule = null;
        }

        this.conf = conf;
        assert(!this.immediateFlushEnabled || (null != this.scheduler));
        this.lastTransmit = Stopwatch.createStarted();
    }