private void buildInfluxDBMetricReporter()

in gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java [727:804]


  private void buildInfluxDBMetricReporter(Properties properties)
      throws MultiReporterException {
    List<MetricReporterException> reporterExceptionList = Lists.newArrayList();

    boolean metricsEnabled = PropertiesUtils
        .getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_INFLUXDB_METRICS_ENABLED_KEY,
            ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_METRICS_ENABLED);
    if (metricsEnabled) {
      LOGGER.info("Reporting metrics to InfluxDB");
    }

    boolean eventsEnabled = PropertiesUtils
        .getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_INFLUXDB_EVENTS_ENABLED_KEY,
            ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_EVENTS_ENABLED);
    if (eventsEnabled) {
      LOGGER.info("Reporting events to InfluxDB");
    }

    if (!metricsEnabled && !eventsEnabled) {
      return;
    }

    try {
      Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE),
          "InfluxDB database name is missing.");
    } catch (IllegalArgumentException exception) {
      reporterExceptionList.add(new MetricReporterException("Missing InfluxDB configuration(s)", exception, ReporterType.METRIC_EVENT, ReporterSinkType.INFLUXDB));
      throw new MultiReporterException("Failed to start one or more InfluxDB reporters", reporterExceptionList);
    }

    String url = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_URL);
    String username = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_USER);
    String password = PasswordManager.getInstance(properties)
        .readPassword(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_PASSWORD));
    String database = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE);

    InfluxDBConnectionType connectionType;
    String type = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_SENDING_TYPE,
        ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_SENDING_TYPE).toUpperCase();
    try {
      connectionType = InfluxDBConnectionType.valueOf(type);
    } catch (IllegalArgumentException exception) {
      LOGGER
          .warn("InfluxDB Reporter connection type " + type + " not recognized. Will use TCP for sending.", exception);
      connectionType = InfluxDBConnectionType.TCP;
    }

    if (metricsEnabled) {
      try {
        InfluxDBReporter.Factory.newBuilder().withConnectionType(connectionType)
            .withConnection(url, username, password, database).withMetricContextName(
            this.metricContext.getName()) // contains the current job id
            .build(properties);
      } catch (IOException e) {
        reporterExceptionList.add(new MetricReporterException("Failed to create InfluxDB metrics reporter.", e, ReporterType.METRIC, ReporterSinkType.INFLUXDB));
      }
    }

    if (eventsEnabled) {
      String eventsDbProp = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_EVENTS_DATABASE);
      String eventsDatabase = (eventsDbProp == null) ? (metricsEnabled ? database : null) : eventsDbProp;
      try {
        InfluxDBEventReporter eventReporter =
            InfluxDBEventReporter.Factory.forContext(RootMetricContext.get())
              .withConnectionType(connectionType)
              .withConnection(url, username, password, eventsDatabase)
              .build();
        this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
      }
      catch (IOException e) {
        reporterExceptionList.add(new MetricReporterException("Failed to create InfluxDB event reporter.", e, ReporterType.EVENT, ReporterSinkType.INFLUXDB));
      }
    }

    if (!reporterExceptionList.isEmpty()) {
      throw new MultiReporterException("Failed to create one or more InfluxDB reporters", reporterExceptionList);
    }
  }