in src/main/java/org/apache/accumulo/testing/continuous/TimeBinner.java [81:189]
public static void main(String[] args) throws Exception {
Opts opts = new Opts();
opts.parseArgs(TimeBinner.class.getName(), args);
Operation operation = Operation.valueOf(opts.operation);
SimpleDateFormat sdf = new SimpleDateFormat(opts.dateFormat);
BufferedReader in = new BufferedReader(new InputStreamReader(System.in, UTF_8));
String line;
HashMap<Long,DoubleWrapper> aggregation1 = new HashMap<>();
HashMap<Long,DoubleWrapper> aggregation2 = new HashMap<>();
HashMap<Long,DoubleWrapper> aggregation3 = new HashMap<>();
HashMap<Long,DoubleWrapper> aggregation4 = new HashMap<>();
while ((line = in.readLine()) != null) {
try {
String[] tokens = line.split("\\s+");
long time = (long) Double.parseDouble(tokens[opts.timeColumn]);
double data = Double.parseDouble(tokens[opts.dataColumn]);
time = (time / opts.period) * opts.period;
switch (operation) {
case AMM_HACK1: {
if (opts.dataColumn < 2) {
throw new IllegalArgumentException("--dataColumn must be at least 2");
}
double data_min = Double.parseDouble(tokens[opts.dataColumn - 2]);
double data_max = Double.parseDouble(tokens[opts.dataColumn - 1]);
updateMin(time, aggregation3, data, data_min);
updateMax(time, aggregation4, data, data_max);
increment(time, aggregation1, data);
increment(time, aggregation2, 1);
break;
}
case AMM: {
updateMin(time, aggregation3, data, data);
updateMax(time, aggregation4, data, data);
increment(time, aggregation1, data);
increment(time, aggregation2, 1);
break;
}
case AVG: {
increment(time, aggregation1, data);
increment(time, aggregation2, 1);
break;
}
case MAX: {
updateMax(time, aggregation1, data, data);
break;
}
case MIN: {
updateMin(time, aggregation1, data, data);
break;
}
case COUNT: {
increment(time, aggregation1, 1);
break;
}
case SUM:
case CUMULATIVE: {
increment(time, aggregation1, data);
break;
}
}
} catch (Exception e) {
log.error("Failed to process line: {} {}", line, e.getMessage());
}
}
AtomicReference<Double> cumulative = new AtomicReference<>((double) 0);
aggregation1.entrySet().stream().sorted().forEach(entry -> {
final String value;
final var currentKey = entry.getKey();
final var currentValue = entry.getValue();
switch (operation) {
case AMM_HACK1:
case AMM: {
DoubleWrapper countdw = aggregation2.get(currentKey);
value = "" + (currentValue.d / countdw.d) + " " + aggregation3.get(currentKey).d + " "
+ aggregation4.get(currentKey).d;
break;
}
case AVG: {
DoubleWrapper countdw = aggregation2.get(currentKey);
value = "" + (currentValue.d / countdw.d);
break;
}
case CUMULATIVE: {
cumulative.updateAndGet(v -> v + currentValue.d);
value = "" + cumulative;
break;
}
default:
value = "" + currentValue.d;
}
log.info("{} {}", sdf.format(new Date(currentKey)), value);
});
}