private void selectComparator()

in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java [1381:1537]


    private void selectComparator(
            MapReduceOper mro,
            byte keyType,
            org.apache.hadoop.mapreduce.Job job) throws JobCreationException {
        // If this operator is involved in an order by, use the pig specific raw
        // comparators.  If it has a cogroup, we need to set the comparator class
        // to the raw comparator and the grouping comparator class to pig specific
        // raw comparators (which skip the index).  Otherwise use the hadoop provided
        // raw comparator.

        // An operator has an order by if global sort is set or if it's successor has
        // global sort set (because in that case it's the sampling job) or if
        // it's a limit after a sort.
        boolean hasOrderBy = false;
        if (mro.isGlobalSort() || mro.isLimitAfterSort() || mro.usingTypedComparator()) {
            hasOrderBy = true;
        } else {
            List<MapReduceOper> succs = plan.getSuccessors(mro);
            if (succs != null) {
                MapReduceOper succ = succs.get(0);
                if (succ.isGlobalSort()) hasOrderBy = true;
            }
        }
        if (hasOrderBy) {
            switch (keyType) {
            case DataType.BOOLEAN:
                job.setSortComparatorClass(PigBooleanRawComparator.class);
                break;

            case DataType.INTEGER:
                job.setSortComparatorClass(PigIntRawComparator.class);
                break;

            case DataType.LONG:
                job.setSortComparatorClass(PigLongRawComparator.class);
                break;

            case DataType.FLOAT:
                job.setSortComparatorClass(PigFloatRawComparator.class);
                break;

            case DataType.DOUBLE:
                job.setSortComparatorClass(PigDoubleRawComparator.class);
                break;

            case DataType.DATETIME:
                job.setSortComparatorClass(PigDateTimeRawComparator.class);
                break;

            case DataType.CHARARRAY:
                job.setSortComparatorClass(PigTextRawComparator.class);
                break;

            case DataType.BYTEARRAY:
                job.setSortComparatorClass(PigBytesRawComparator.class);
                break;

            case DataType.BIGINTEGER:
                job.setSortComparatorClass(PigBigIntegerRawComparator.class);
                break;

            case DataType.BIGDECIMAL:
                job.setSortComparatorClass(PigBigDecimalRawComparator.class);
                break;

            case DataType.MAP:
                int errCode = 1068;
                String msg = "Using Map as key not supported.";
                throw new JobCreationException(msg, errCode, PigException.INPUT);

            case DataType.TUPLE:
                job.setSortComparatorClass(PigTupleSortComparator.class);
                break;

            case DataType.BAG:
                errCode = 1068;
                msg = "Using Bag as key not supported.";
                throw new JobCreationException(msg, errCode, PigException.INPUT);

            default:
                break;
            }
            return;
        }

        switch (keyType) {
        case DataType.BOOLEAN:
            job.setSortComparatorClass(PigBooleanWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingBooleanWritableComparator.class);
            break;

        case DataType.INTEGER:
            job.setSortComparatorClass(PigIntWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingIntWritableComparator.class);
            break;

        case DataType.BIGINTEGER:
            job.setSortComparatorClass(PigBigIntegerWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingBigIntegerWritableComparator.class);
            break;

        case DataType.BIGDECIMAL:
            job.setSortComparatorClass(PigBigDecimalWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingBigDecimalWritableComparator.class);
            break;

        case DataType.LONG:
            job.setSortComparatorClass(PigLongWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingLongWritableComparator.class);
            break;

        case DataType.FLOAT:
            job.setSortComparatorClass(PigFloatWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingFloatWritableComparator.class);
            break;

        case DataType.DOUBLE:
            job.setSortComparatorClass(PigDoubleWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingDoubleWritableComparator.class);
            break;

        case DataType.DATETIME:
            job.setSortComparatorClass(PigDateTimeWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingDateTimeWritableComparator.class);
            break;

        case DataType.CHARARRAY:
            job.setSortComparatorClass(PigCharArrayWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingCharArrayWritableComparator.class);
            break;

        case DataType.BYTEARRAY:
            job.setSortComparatorClass(PigDBAWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingDBAWritableComparator.class);
            break;

        case DataType.MAP:
            int errCode = 1068;
            String msg = "Using Map as key not supported.";
            throw new JobCreationException(msg, errCode, PigException.INPUT);

        case DataType.TUPLE:
            job.setSortComparatorClass(PigTupleWritableComparator.class);
            job.setGroupingComparatorClass(PigGroupingTupleWritableComparator.class);
            break;

        case DataType.BAG:
            errCode = 1068;
            msg = "Using Bag as key not supported.";
            throw new JobCreationException(msg, errCode, PigException.INPUT);

        default:
            errCode = 2036;
            msg = "Unhandled key type " + DataType.findTypeName(keyType);
            throw new JobCreationException(msg, errCode, PigException.BUG);
        }
    }