in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/BatchWrite.java [92:137]
public BatchWrite(long maxBytes, int maxMessages, Duration maxDelay,
PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
boolean enableOpenCensus) {
this.maxBytes = maxBytes;
this.maxMessages = maxMessages;
this.maxDelay = maxDelay;
this.batchKeyTemplate = batchKeyTemplate;
this.executor = executor;
this.enableOpenCensus = enableOpenCensus;
if (enableOpenCensus) {
// create OpenCensus measures with a class name prefix
final String shortClassName = this.getClass().getName().replaceAll(".*[.]", "");
final String batchType = shortClassName.replace('$', '_').toLowerCase();
totalBytes = MeasureLong.create(batchType + "_total_bytes",
"The number of bytes received in " + shortClassName, "B");
totalMessages = MeasureLong.create(batchType + "_total_messages",
"The number of messages received in " + shortClassName, "1");
batchCount = MeasureLong.create(batchType + "_batch_count",
"The number of batches closed in " + shortClassName, "1");
batchBytes = MeasureLong.create(batchType + "_batch_bytes",
"Distribution of the number of bytes in a batch in " + shortClassName, "B");
batchMessages = MeasureLong.create(batchType + "_batch_messages",
"Distribution of the number of messages in a batch in " + shortClassName, "1");
batchDelay = MeasureLong.create(batchType + "_batch_delay",
"Distribution of the number of milliseconds a batch waited for messages in "
+ shortClassName,
"ms");
// register a view for every measure
final ViewManager viewManager = Stats.getViewManager();
ImmutableMap.<MeasureLong, Aggregation>builder().put(batchCount, COUNT_AGG)
.put(batchBytes, BATCH_BYTES_AGG).put(batchMessages, BATCH_MESSAGES_AGG)
.put(batchDelay, BATCH_DELAY_AGG).put(totalBytes, SUM_AGG).put(totalMessages, SUM_AGG)
.build()
.forEach((measure, aggregation) -> viewManager
.registerView(View.create(View.Name.create(measure.getName()),
measure.getDescription(), measure, aggregation, ImmutableList.of())));
} else {
batchCount = null;
batchBytes = null;
batchMessages = null;
batchDelay = null;
totalBytes = null;
totalMessages = null;
}
}