in flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java [510:555]
private LatencyStats initializeLatencyStats() {
try {
Configuration taskManagerConfig =
containingTask.getEnvironment().getTaskManagerInfo().getConfiguration();
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
LOG.warn(
"{} has been set to a value equal or below 0: {}. Using default.",
MetricOptions.LATENCY_HISTORY_SIZE,
historySize);
historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
}
final String configuredGranularity =
taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
LatencyStats.Granularity granularity;
try {
granularity =
LatencyStats.Granularity.valueOf(
configuredGranularity.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException iae) {
granularity = LatencyStats.Granularity.OPERATOR;
LOG.warn(
"Configured value {} option for {} is invalid. Defaulting to {}.",
configuredGranularity,
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
granularity);
}
MetricGroup jobMetricGroup = this.metrics.getJobMetricGroup();
return new LatencyStats(
jobMetricGroup.addGroup("latency"),
historySize,
containingTask.getIndexInSubtaskGroup(),
getOperatorID(),
granularity);
} catch (Exception e) {
LOG.warn("An error occurred while instantiating latency metrics.", e);
return new LatencyStats(
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()
.addGroup("latency"),
1,
0,
new OperatorID(),
LatencyStats.Granularity.SINGLE);
}
}