in contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java [310:438]
private void init(Tuple input) throws IOException {
initialized = true;
// Look for the aggregate in arg 2
try {
agg = (String)input.get(1);
} catch (ClassCastException cce) {
int errCode = 2107; // TODO not sure this is the right one
String msg = "Over expected a string for arg 2 but received " +
DataType.findTypeName(input.get(1));
throw new ExecException(msg, errCode, PigException.INPUT);
}
// See if there is a preceding value specified, if not, unbounded
// preceding is the default
rowsBefore = -1;
if (input.size() > 2) {
try {
rowsBefore = (Integer)input.get(2);
} catch (ClassCastException cce) {
int errCode = 2107; // TODO not sure this is the right one
String msg = "Over expected an integer for arg 3 but " +
"received " + DataType.findTypeName(input.get(2));
throw new ExecException(msg, errCode, PigException.INPUT);
}
}
// See if there is a preceding value specified, if not, current row
// is the default
rowsAfter = 0;
if (input.size() > 3) {
try {
rowsAfter = (Integer)input.get(3);
} catch (ClassCastException cce) {
int errCode = 2107; // TODO not sure this is the right one
String msg = "Over expected an integer for arg 4 but " +
"received " + DataType.findTypeName(input.get(3));
throw new ExecException(msg, errCode, PigException.INPUT);
}
}
// Place any additional arguments in the udfArgs array to be passed
// to the UDF each time
if (input.size() > 4) {
udfArgs = new Object[input.size() - 4];
for (int i = 0; i < input.size() - 4; i++) {
udfArgs[i] = input.get(i + 4);
}
}
if ("count".equalsIgnoreCase(agg)) {
func = new COUNT();
} else if ("sum(double)".equalsIgnoreCase(agg) ||
"sum(float)".equalsIgnoreCase(agg)) {
func = new DoubleSum();
} else if ("sum(int)".equalsIgnoreCase(agg) ||
"sum(long)".equalsIgnoreCase(agg)) {
func = new LongSum();
} else if ("sum(bytearray)".equalsIgnoreCase(agg)) {
func = new SUM();
} else if ("sum(bigdecimal)".equalsIgnoreCase(agg)) {
func = new BigDecimalSum();
} else if ("avg(double)".equalsIgnoreCase(agg)) {
func = new DoubleAvg();
} else if ("avg(float)".equalsIgnoreCase(agg)) {
func = new FloatAvg();
} else if ("avg(long)".equalsIgnoreCase(agg)) {
func = new LongAvg();
} else if ("avg(int)".equalsIgnoreCase(agg)) {
func = new IntAvg();
} else if ("avg(bytearray)".equalsIgnoreCase(agg)) {
func = new AVG();
} else if ("avg(bigdecimal)".equalsIgnoreCase(agg)) {
func = new BigDecimalAvg();
} else if ("min(double)".equalsIgnoreCase(agg)) {
func = new DoubleMin();
} else if ("min(float)".equalsIgnoreCase(agg)) {
func = new FloatMin();
} else if ("min(long)".equalsIgnoreCase(agg)) {
func = new LongMin();
} else if ("min(int)".equalsIgnoreCase(agg)) {
func = new IntMin();
} else if ("min(chararray)".equalsIgnoreCase(agg)) {
func = new StringMin();
} else if ("min(bytearray)".equalsIgnoreCase(agg)) {
func = new MIN();
} else if ("min(bigdecimal)".equalsIgnoreCase(agg)) {
func = new BigDecimalMin();
} else if ("max(double)".equalsIgnoreCase(agg)) {
func = new DoubleMax();
} else if ("max(float)".equalsIgnoreCase(agg)) {
func = new FloatMax();
} else if ("max(long)".equalsIgnoreCase(agg)) {
func = new LongMax();
} else if ("max(int)".equalsIgnoreCase(agg)) {
func = new IntMax();
} else if ("max(chararray)".equalsIgnoreCase(agg)) {
func = new StringMax();
} else if ("max(bytearray)".equalsIgnoreCase(agg)) {
func = new MAX();
} else if ("max(bigdecimal)".equalsIgnoreCase(agg)) {
func = new BigDecimalMax();
} else if ("row_number".equalsIgnoreCase(agg)) {
func = new RowNumber();
} else if ("first_value".equalsIgnoreCase(agg)) {
func = new FirstValue();
} else if ("last_value".equalsIgnoreCase(agg)) {
func = new LastValue();
} else if ("lead".equalsIgnoreCase(agg)) {
func = new Lead(udfArgs);
} else if ("lag".equalsIgnoreCase(agg)) {
func = new Lag(udfArgs);
} else if ("rank".equalsIgnoreCase(agg)) {
func = new Rank(udfArgs);
} else if ("dense_rank".equalsIgnoreCase(agg)) {
func = new DenseRank(udfArgs);
} else if ("ntile".equalsIgnoreCase(agg)) {
func = new Ntile(udfArgs);
} else if ("percent_rank".equalsIgnoreCase(agg)) {
func = new PercentRank(udfArgs);
} else if ("cume_dist".equalsIgnoreCase(agg)) {
//func = new CumeDist(udfArgs);
func = new CumeDist();
} else if ("debug".equalsIgnoreCase(agg)) {
func = new Debug();
} else {
throw new ExecException("Unknown aggregate " + agg);
}
}