in src/main/java/org/opensearch/performanceanalyzer/reader/MetricProperties.java [224:310]
private boolean processEvent(
Event event,
MemoryDBSnapshot snap,
long startTime,
long lastSnapTimestamp,
BatchBindStep batchHandle) {
if (event.value.isEmpty()) {
return false;
}
String[] lines = event.value.split(System.getProperty("line.separator"));
// First line should be
// {"current_time":1566152878118}
long lastModifiedTime = 0;
try {
lastModifiedTime =
JsonConverter.getLongValue(
lines[0], PerformanceAnalyzerMetrics.METRIC_CURRENT_TIME);
} catch (JsonPathNotFoundException ex) {
LOG.warn(
String.format(
"Fail to get last modified time of %s ExceptionCode: %s",
event.key, StatExceptionCode.JSON_PARSER_ERROR.toString()),
ex);
StatsCollector.instance().logException(StatExceptionCode.JSON_PARSER_ERROR);
return false;
} catch (JsonProcessingException ex) {
LOG.warn(
String.format(
"Malformed json (%s) ExceptionCode: %s",
lines[0], StatExceptionCode.JSON_PARSER_ERROR.toString()),
ex);
StatsCollector.instance().logException(StatExceptionCode.JSON_PARSER_ERROR);
return false;
} catch (IOException ex) {
LOG.warn(
String.format(
"I/O exception processing metric %s with value: %s.%s"
+ "ExceptionCode: %s",
event.key,
lines[0],
File.separator,
StatExceptionCode.JSON_PARSER_ERROR.toString()),
ex);
StatsCollector.instance().logException(StatExceptionCode.JSON_PARSER_ERROR);
return false;
}
// Only consider metrics if the file has been updated in the 5
// second window.
if (lastModifiedTime > startTime || lastModifiedTime <= lastSnapTimestamp) {
return false;
}
// snap's last updated time is the highest last modified time of all
// the entries in the snapshot.
if (snap.getLastUpdatedTime() < lastModifiedTime) {
snap.setLastUpdatedTime(lastModifiedTime);
}
String[] derivedDimension = handler.processExtraDimensions(event.key);
int numMetrics = derivedDimension.length + directDimensions.length + metadata.length;
Object[] templateMetricVals = new Object[numMetrics];
int valIndex = 0;
for (String s : derivedDimension) {
templateMetricVals[valIndex] = s;
valIndex += 1;
}
boolean processed = false;
// first line is last modified time of the file.
// We need last modified time in milliseconds. But JDK method
// File.lastModified() cannot give that precision. So we need
// to add last modified time by ourselves.
// See:
// https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6939260
for (int lineNum = 1; lineNum < lines.length; lineNum++) {
processed =
processJsonLine(lines[lineNum], batchHandle, templateMetricVals) || processed;
}
return processed;
}