in gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java [727:804]
private void buildInfluxDBMetricReporter(Properties properties)
throws MultiReporterException {
List<MetricReporterException> reporterExceptionList = Lists.newArrayList();
boolean metricsEnabled = PropertiesUtils
.getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_INFLUXDB_METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_METRICS_ENABLED);
if (metricsEnabled) {
LOGGER.info("Reporting metrics to InfluxDB");
}
boolean eventsEnabled = PropertiesUtils
.getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_INFLUXDB_EVENTS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_EVENTS_ENABLED);
if (eventsEnabled) {
LOGGER.info("Reporting events to InfluxDB");
}
if (!metricsEnabled && !eventsEnabled) {
return;
}
try {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE),
"InfluxDB database name is missing.");
} catch (IllegalArgumentException exception) {
reporterExceptionList.add(new MetricReporterException("Missing InfluxDB configuration(s)", exception, ReporterType.METRIC_EVENT, ReporterSinkType.INFLUXDB));
throw new MultiReporterException("Failed to start one or more InfluxDB reporters", reporterExceptionList);
}
String url = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_URL);
String username = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_USER);
String password = PasswordManager.getInstance(properties)
.readPassword(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_PASSWORD));
String database = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE);
InfluxDBConnectionType connectionType;
String type = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_SENDING_TYPE,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_SENDING_TYPE).toUpperCase();
try {
connectionType = InfluxDBConnectionType.valueOf(type);
} catch (IllegalArgumentException exception) {
LOGGER
.warn("InfluxDB Reporter connection type " + type + " not recognized. Will use TCP for sending.", exception);
connectionType = InfluxDBConnectionType.TCP;
}
if (metricsEnabled) {
try {
InfluxDBReporter.Factory.newBuilder().withConnectionType(connectionType)
.withConnection(url, username, password, database).withMetricContextName(
this.metricContext.getName()) // contains the current job id
.build(properties);
} catch (IOException e) {
reporterExceptionList.add(new MetricReporterException("Failed to create InfluxDB metrics reporter.", e, ReporterType.METRIC, ReporterSinkType.INFLUXDB));
}
}
if (eventsEnabled) {
String eventsDbProp = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_EVENTS_DATABASE);
String eventsDatabase = (eventsDbProp == null) ? (metricsEnabled ? database : null) : eventsDbProp;
try {
InfluxDBEventReporter eventReporter =
InfluxDBEventReporter.Factory.forContext(RootMetricContext.get())
.withConnectionType(connectionType)
.withConnection(url, username, password, eventsDatabase)
.build();
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
}
catch (IOException e) {
reporterExceptionList.add(new MetricReporterException("Failed to create InfluxDB event reporter.", e, ReporterType.EVENT, ReporterSinkType.INFLUXDB));
}
}
if (!reporterExceptionList.isEmpty()) {
throw new MultiReporterException("Failed to create one or more InfluxDB reporters", reporterExceptionList);
}
}