in uReplicator-Common/src/main/java/com/uber/stream/kafka/mirrormaker/common/utils/C3QueryUtils.java [73:111]
static void extractJsonResults(String jsonStr, List<String> topics, Map<String, TopicWorkload> workloads) {
try {
JSONObject jsonObj = JSON.parseObject(jsonStr);
if (jsonObj == null) {
LOGGER.info("Failed to parse C3 result: " + jsonStr);
return;
}
for (String topic : topics) {
JSONArray arr = jsonObj.getJSONArray(topic);
if (arr == null || arr.size() == 0) {
continue;
}
JSONObject metrics = arr.getJSONObject(0);
if (metrics == null) {
LOGGER.info("Failed to parse C3 result for topic '" + topic + "'");
continue;
}
Long startTimeSec = metrics.getLong("startTimeSec");
Long endTimeSec = metrics.getLong("endTimeSec");
Long totalBytes = metrics.getLong("totalBytes");
Long totalCount = metrics.getLong("totalCount");
if (startTimeSec == null || endTimeSec == null || totalBytes == null || totalCount == null) {
LOGGER.info("Failed to parse C3 result for topic '" + topic + "'");
continue;
}
double period = endTimeSec - startTimeSec;
if (period <= 0) {
LOGGER.info("Invalid C3 result for topic '" + topic + "': startTimeSec=" + startTimeSec + ","
+ " endTimeSec=" + endTimeSec);
continue;
}
TopicWorkload tw = new TopicWorkload(totalBytes / period, totalCount / period);
tw.setLastUpdate(endTimeSec * 1000);
workloads.put(topic, tw);
}
} catch (Exception e) {
LOGGER.error("Failed to extract workload information from JSON: " + jsonStr, e);
}
}