static void extractJsonResults()

in uReplicator-Common/src/main/java/com/uber/stream/kafka/mirrormaker/common/utils/C3QueryUtils.java [73:111]


  static void extractJsonResults(String jsonStr, List<String> topics, Map<String, TopicWorkload> workloads) {
    try {
      JSONObject jsonObj = JSON.parseObject(jsonStr);
      if (jsonObj == null) {
        LOGGER.info("Failed to parse C3 result: " + jsonStr);
        return;
      }
      for (String topic : topics) {
        JSONArray arr = jsonObj.getJSONArray(topic);
        if (arr == null || arr.size() == 0) {
          continue;
        }
        JSONObject metrics = arr.getJSONObject(0);
        if (metrics == null) {
          LOGGER.info("Failed to parse C3 result for topic '" + topic + "'");
          continue;
        }
        Long startTimeSec = metrics.getLong("startTimeSec");
        Long endTimeSec = metrics.getLong("endTimeSec");
        Long totalBytes = metrics.getLong("totalBytes");
        Long totalCount = metrics.getLong("totalCount");
        if (startTimeSec == null || endTimeSec == null || totalBytes == null || totalCount == null) {
          LOGGER.info("Failed to parse C3 result for topic '" + topic + "'");
          continue;
        }
        double period = endTimeSec - startTimeSec;
        if (period <= 0) {
          LOGGER.info("Invalid C3 result for topic '" + topic + "': startTimeSec=" + startTimeSec + ","
              + " endTimeSec=" + endTimeSec);
          continue;
        }
        TopicWorkload tw = new TopicWorkload(totalBytes / period, totalCount / period);
        tw.setLastUpdate(endTimeSec * 1000);
        workloads.put(topic, tw);
      }
    } catch (Exception e) {
      LOGGER.error("Failed to extract workload information from JSON: " + jsonStr, e);
    }
  }