public BatchWrite()

in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/BatchWrite.java [92:137]


  public BatchWrite(long maxBytes, int maxMessages, Duration maxDelay,
      PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
      boolean enableOpenCensus) {
    this.maxBytes = maxBytes;
    this.maxMessages = maxMessages;
    this.maxDelay = maxDelay;
    this.batchKeyTemplate = batchKeyTemplate;
    this.executor = executor;
    this.enableOpenCensus = enableOpenCensus;
    if (enableOpenCensus) {
      // create OpenCensus measures with a class name prefix
      final String shortClassName = this.getClass().getName().replaceAll(".*[.]", "");
      final String batchType = shortClassName.replace('$', '_').toLowerCase();
      totalBytes = MeasureLong.create(batchType + "_total_bytes",
          "The number of bytes received in " + shortClassName, "B");
      totalMessages = MeasureLong.create(batchType + "_total_messages",
          "The number of messages received in " + shortClassName, "1");
      batchCount = MeasureLong.create(batchType + "_batch_count",
          "The number of batches closed in " + shortClassName, "1");
      batchBytes = MeasureLong.create(batchType + "_batch_bytes",
          "Distribution of the number of bytes in a batch in " + shortClassName, "B");
      batchMessages = MeasureLong.create(batchType + "_batch_messages",
          "Distribution of the number of messages in a batch in " + shortClassName, "1");
      batchDelay = MeasureLong.create(batchType + "_batch_delay",
          "Distribution of the number of milliseconds a batch waited for messages in "
              + shortClassName,
          "ms");

      // register a view for every measure
      final ViewManager viewManager = Stats.getViewManager();
      ImmutableMap.<MeasureLong, Aggregation>builder().put(batchCount, COUNT_AGG)
          .put(batchBytes, BATCH_BYTES_AGG).put(batchMessages, BATCH_MESSAGES_AGG)
          .put(batchDelay, BATCH_DELAY_AGG).put(totalBytes, SUM_AGG).put(totalMessages, SUM_AGG)
          .build()
          .forEach((measure, aggregation) -> viewManager
              .registerView(View.create(View.Name.create(measure.getName()),
                  measure.getDescription(), measure, aggregation, ImmutableList.of())));
    } else {
      batchCount = null;
      batchBytes = null;
      batchMessages = null;
      batchDelay = null;
      totalBytes = null;
      totalMessages = null;
    }
  }