in datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java [201:262]
protected final static Tuple extreme(int pind, int psign, Tuple input,
PigProgressable reporter) throws ExecException {
if (input == null || input.size() == 0 || input.get(0) == null) {
return null;
}
DataBag values = (DataBag) input.get(0);
// if we were handed an empty bag, return NULL
// this is in compliance with SQL standard
if (values.size() == 0)
return null;
java.lang.Comparable curMax = null;
Tuple curMaxTuple = null;
int n = 0;
for (Tuple t : values) {
if (reporter != null && ++n % PROGRESS_FREQUENCY == 0)
reporter.progress();
if (t == null) {
// just in case.
continue;
}
try {
Object o = t.get(pind);
if (o == null) {
// if the comparison field is null it will never be
// returned, we won't even compare.
continue;
}
java.lang.Comparable d = (java.lang.Comparable) o;
if (curMax == null) {
curMax = d;
curMaxTuple = t;
} else {
/**
* <pre>
* c > 0 iff ((sign==1 && d>curMax) || (sign==-1 && d<curMax))
* </pre>
*
* In both case we want to replace curMax/curMaxTuple by the
* new values
*
**/
int c = psign * d.compareTo(curMax);
if (c > 0) {
curMax = d;
curMaxTuple = t;
}
}
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
int errCode = -1;
String msg = "Error while computing ExtremalTupleByNthField in ExtremalTupleByNthField,";
throw new ExecException(msg, errCode, PigException.ERROR, e);
}
}
return curMaxTuple;
}