in spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/MetricsSystem.java [94:124]
protected void registerSinks() {
log.info("sinkPropertiesMap: {}", sinkPropertiesMap);
sinkPropertiesMap
.values()
.forEach(
sinkProp -> {
try {
Class<Sink> sinkClass = (Class<Sink>) Class.forName(sinkProp.getClassName());
Sink sinkInstance;
sinkInstance =
sinkClass
.getConstructor(Properties.class, MetricRegistry.class)
.newInstance(sinkProp.getProperties(), registry);
sinks.add(sinkInstance);
} catch (InstantiationException
| IllegalAccessException
| IllegalArgumentException
| InvocationTargetException
| NoSuchMethodException
| SecurityException
| ClassNotFoundException e) {
if (log.isErrorEnabled()) {
log.error(
"Fail to create metrics sink for sink name {}, sink properties {}",
sinkProp.getClassName(),
sinkProp.getProperties());
}
throw new IllegalStateException("Fail to create metrics sink", e);
}
});
}