in stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java [257:397]
protected BKLogSegmentWriter(String streamName,
String logSegmentName,
DistributedLogConfiguration conf,
int logSegmentMetadataVersion,
LogSegmentEntryWriter entryWriter,
DistributedLock lock, /** the lock needs to be acquired **/
long startTxId,
long logSegmentSequenceNumber,
OrderedScheduler scheduler,
StatsLogger statsLogger,
StatsLogger perLogStatsLogger,
AlertStatsLogger alertStatsLogger,
PermitLimiter globalWriteLimiter,
FeatureProvider featureProvider,
DynamicDistributedLogConfiguration dynConf)
throws IOException {
super();
// set up a write limiter
PermitLimiter streamWriteLimiter = null;
if (conf.getPerWriterOutstandingWriteLimit() < 0) {
streamWriteLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
} else {
Feature disableWriteLimitFeature = featureProvider.getFeature(
CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
streamWriteLimiter = new SimplePermitLimiter(
conf.getOutstandingWriteLimitDarkmode(),
conf.getPerWriterOutstandingWriteLimit(),
statsLogger.scope("streamWriteLimiter"),
false,
disableWriteLimitFeature);
}
this.writeLimiter = new WriteLimiter(streamName, streamWriteLimiter, globalWriteLimiter);
this.alertStatsLogger = alertStatsLogger;
StatsLogger flushStatsLogger = statsLogger.scope("flush");
StatsLogger pFlushStatsLogger = flushStatsLogger.scope("periodic");
pFlushSuccesses = pFlushStatsLogger.getCounter("success");
pFlushMisses = pFlushStatsLogger.getCounter("miss");
// transmit
StatsLogger transmitDataStatsLogger = statsLogger.scope("data");
transmitDataSuccesses = transmitDataStatsLogger.getCounter("success");
transmitDataMisses = transmitDataStatsLogger.getCounter("miss");
StatsLogger transmitStatsLogger = statsLogger.scope("transmit");
transmitDataPacketSize = transmitStatsLogger.getOpStatsLogger("packetsize");
StatsLogger transmitControlStatsLogger = statsLogger.scope("control");
transmitControlSuccesses = transmitControlStatsLogger.getCounter("success");
StatsLogger segWriterStatsLogger = statsLogger.scope("seg_writer");
writeTime = segWriterStatsLogger.getOpStatsLogger("write");
addCompleteTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("callback");
addCompleteQueuedTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("queued");
addCompleteDeferredTime = segWriterStatsLogger.scope("add_complete").getOpStatsLogger("deferred");
pendingWrites = segWriterStatsLogger.getCounter("pending");
// outstanding transmit requests
transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
transmitOutstandingGauge = new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}
@Override
public Number getSample() {
return outstandingTransmitsUpdater.get(BKLogSegmentWriter.this);
}
};
transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge);
this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
this.streamName = streamName;
this.logSegmentMetadataVersion = logSegmentMetadataVersion;
this.entryWriter = entryWriter;
this.lock = lock;
this.lock.checkOwnershipAndReacquire();
final int configuredTransmissionThreshold = dynConf.getOutputBufferSize();
if (configuredTransmissionThreshold > MAX_LOGRECORDSET_SIZE) {
LOG.warn("Setting output buffer size {} greater than max transmission size {} for log segment {}",
configuredTransmissionThreshold, MAX_LOGRECORDSET_SIZE, fullyQualifiedLogSegment);
this.transmissionThreshold = MAX_LOGRECORDSET_SIZE;
} else {
this.transmissionThreshold = configuredTransmissionThreshold;
}
this.compressionType = CompressionUtils.stringToType(conf.getCompressionType());
this.logSegmentSequenceNumber = logSegmentSequenceNumber;
this.recordSetWriter = Entry.newEntry(
streamName,
Math.max(transmissionThreshold, 1024),
envelopeBeforeTransmit(),
compressionType);
this.packetPrevious = null;
this.startTxId = startTxId;
this.lastTxId = startTxId;
this.lastTxIdAcknowledged = startTxId;
this.enableRecordCounts = conf.getEnableRecordCounts();
this.immediateFlushEnabled = conf.getImmediateFlushEnabled();
this.isDurableWriteEnabled = dynConf.isDurableWriteEnabled();
this.scheduler = scheduler;
// Failure injection
if (conf.getEIInjectWriteDelay()) {
this.writeDelayInjector = new RandomDelayFailureInjector(dynConf);
} else {
this.writeDelayInjector = FailureInjector.NULL;
}
// If we are transmitting immediately (threshold == 0) and if immediate
// flush is enabled, we don't need the periodic flush task
final int configuredPeriodicFlushFrequency = dynConf.getPeriodicFlushFrequencyMilliSeconds();
if (!immediateFlushEnabled || (0 != this.transmissionThreshold)) {
int periodicFlushFrequency = configuredPeriodicFlushFrequency;
if (periodicFlushFrequency > 0 && scheduler != null) {
periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
periodicFlushFrequency / 2, periodicFlushFrequency / 2, TimeUnit.MILLISECONDS);
} else {
periodicFlushSchedule = null;
}
} else {
// Min delay heuristic applies only when immediate flush is enabled
// and transmission threshold is zero
minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs();
periodicFlushSchedule = null;
}
this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
if (periodicKeepAliveMs > 0 && scheduler != null) {
periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
keepAlive();
}
}, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS);
} else {
periodicKeepAliveSchedule = null;
}
this.conf = conf;
assert(!this.immediateFlushEnabled || (null != this.scheduler));
this.lastTransmit = Stopwatch.createStarted();
}