in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java [1381:1537]
private void selectComparator(
MapReduceOper mro,
byte keyType,
org.apache.hadoop.mapreduce.Job job) throws JobCreationException {
// If this operator is involved in an order by, use the pig specific raw
// comparators. If it has a cogroup, we need to set the comparator class
// to the raw comparator and the grouping comparator class to pig specific
// raw comparators (which skip the index). Otherwise use the hadoop provided
// raw comparator.
// An operator has an order by if global sort is set or if it's successor has
// global sort set (because in that case it's the sampling job) or if
// it's a limit after a sort.
boolean hasOrderBy = false;
if (mro.isGlobalSort() || mro.isLimitAfterSort() || mro.usingTypedComparator()) {
hasOrderBy = true;
} else {
List<MapReduceOper> succs = plan.getSuccessors(mro);
if (succs != null) {
MapReduceOper succ = succs.get(0);
if (succ.isGlobalSort()) hasOrderBy = true;
}
}
if (hasOrderBy) {
switch (keyType) {
case DataType.BOOLEAN:
job.setSortComparatorClass(PigBooleanRawComparator.class);
break;
case DataType.INTEGER:
job.setSortComparatorClass(PigIntRawComparator.class);
break;
case DataType.LONG:
job.setSortComparatorClass(PigLongRawComparator.class);
break;
case DataType.FLOAT:
job.setSortComparatorClass(PigFloatRawComparator.class);
break;
case DataType.DOUBLE:
job.setSortComparatorClass(PigDoubleRawComparator.class);
break;
case DataType.DATETIME:
job.setSortComparatorClass(PigDateTimeRawComparator.class);
break;
case DataType.CHARARRAY:
job.setSortComparatorClass(PigTextRawComparator.class);
break;
case DataType.BYTEARRAY:
job.setSortComparatorClass(PigBytesRawComparator.class);
break;
case DataType.BIGINTEGER:
job.setSortComparatorClass(PigBigIntegerRawComparator.class);
break;
case DataType.BIGDECIMAL:
job.setSortComparatorClass(PigBigDecimalRawComparator.class);
break;
case DataType.MAP:
int errCode = 1068;
String msg = "Using Map as key not supported.";
throw new JobCreationException(msg, errCode, PigException.INPUT);
case DataType.TUPLE:
job.setSortComparatorClass(PigTupleSortComparator.class);
break;
case DataType.BAG:
errCode = 1068;
msg = "Using Bag as key not supported.";
throw new JobCreationException(msg, errCode, PigException.INPUT);
default:
break;
}
return;
}
switch (keyType) {
case DataType.BOOLEAN:
job.setSortComparatorClass(PigBooleanWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingBooleanWritableComparator.class);
break;
case DataType.INTEGER:
job.setSortComparatorClass(PigIntWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingIntWritableComparator.class);
break;
case DataType.BIGINTEGER:
job.setSortComparatorClass(PigBigIntegerWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingBigIntegerWritableComparator.class);
break;
case DataType.BIGDECIMAL:
job.setSortComparatorClass(PigBigDecimalWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingBigDecimalWritableComparator.class);
break;
case DataType.LONG:
job.setSortComparatorClass(PigLongWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingLongWritableComparator.class);
break;
case DataType.FLOAT:
job.setSortComparatorClass(PigFloatWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingFloatWritableComparator.class);
break;
case DataType.DOUBLE:
job.setSortComparatorClass(PigDoubleWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingDoubleWritableComparator.class);
break;
case DataType.DATETIME:
job.setSortComparatorClass(PigDateTimeWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingDateTimeWritableComparator.class);
break;
case DataType.CHARARRAY:
job.setSortComparatorClass(PigCharArrayWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingCharArrayWritableComparator.class);
break;
case DataType.BYTEARRAY:
job.setSortComparatorClass(PigDBAWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingDBAWritableComparator.class);
break;
case DataType.MAP:
int errCode = 1068;
String msg = "Using Map as key not supported.";
throw new JobCreationException(msg, errCode, PigException.INPUT);
case DataType.TUPLE:
job.setSortComparatorClass(PigTupleWritableComparator.class);
job.setGroupingComparatorClass(PigGroupingTupleWritableComparator.class);
break;
case DataType.BAG:
errCode = 1068;
msg = "Using Bag as key not supported.";
throw new JobCreationException(msg, errCode, PigException.INPUT);
default:
errCode = 2036;
msg = "Unhandled key type " + DataType.findTypeName(keyType);
throw new JobCreationException(msg, errCode, PigException.BUG);
}
}