public static void main()

in src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java [81:189]


  public static void main(String[] args) throws Exception {
    Opts opts = new Opts();
    opts.parseArgs(TimeBinner.class.getName(), args);

    Operation operation = Operation.valueOf(opts.operation);
    SimpleDateFormat sdf = new SimpleDateFormat(opts.dateFormat);

    BufferedReader in = new BufferedReader(new InputStreamReader(System.in, UTF_8));

    String line;

    HashMap<Long,DoubleWrapper> aggregation1 = new HashMap<>();
    HashMap<Long,DoubleWrapper> aggregation2 = new HashMap<>();
    HashMap<Long,DoubleWrapper> aggregation3 = new HashMap<>();
    HashMap<Long,DoubleWrapper> aggregation4 = new HashMap<>();

    while ((line = in.readLine()) != null) {

      try {
        String[] tokens = line.split("\\s+");

        long time = (long) Double.parseDouble(tokens[opts.timeColumn]);
        double data = Double.parseDouble(tokens[opts.dataColumn]);

        time = (time / opts.period) * opts.period;

        switch (operation) {
          case AMM_HACK1: {
            if (opts.dataColumn < 2) {
              throw new IllegalArgumentException("--dataColumn must be at least 2");
            }
            double data_min = Double.parseDouble(tokens[opts.dataColumn - 2]);
            double data_max = Double.parseDouble(tokens[opts.dataColumn - 1]);

            updateMin(time, aggregation3, data, data_min);
            updateMax(time, aggregation4, data, data_max);

            increment(time, aggregation1, data);
            increment(time, aggregation2, 1);
            break;
          }
          case AMM: {
            updateMin(time, aggregation3, data, data);
            updateMax(time, aggregation4, data, data);

            increment(time, aggregation1, data);
            increment(time, aggregation2, 1);
            break;
          }
          case AVG: {
            increment(time, aggregation1, data);
            increment(time, aggregation2, 1);
            break;
          }
          case MAX: {
            updateMax(time, aggregation1, data, data);
            break;
          }
          case MIN: {
            updateMin(time, aggregation1, data, data);
            break;
          }
          case COUNT: {
            increment(time, aggregation1, 1);
            break;
          }
          case SUM:
          case CUMULATIVE: {
            increment(time, aggregation1, data);
            break;
          }
        }

      } catch (Exception e) {
        log.error("Failed to process line: {} {}", line, e.getMessage());
      }
    }

    AtomicReference<Double> cumulative = new AtomicReference<>((double) 0);
    aggregation1.entrySet().stream().sorted().forEach(entry -> {
      final String value;
      final var currentKey = entry.getKey();
      final var currentValue = entry.getValue();

      switch (operation) {
        case AMM_HACK1:
        case AMM: {
          DoubleWrapper countdw = aggregation2.get(currentKey);
          value = "" + (currentValue.d / countdw.d) + " " + aggregation3.get(currentKey).d + " "
              + aggregation4.get(currentKey).d;
          break;
        }
        case AVG: {
          DoubleWrapper countdw = aggregation2.get(currentKey);
          value = "" + (currentValue.d / countdw.d);
          break;
        }
        case CUMULATIVE: {
          cumulative.updateAndGet(v -> v + currentValue.d);
          value = "" + cumulative;
          break;
        }
        default:
          value = "" + currentValue.d;
      }

      log.info("{} {}", sdf.format(new Date(currentKey)), value);
    });
  }