private int compareDefaultTuple()

in src/org/apache/pig/data/DefaultTuple.java [267:432]


        private int compareDefaultTuple(ByteBuffer bb1, ByteBuffer bb2, boolean topLevel) {
            mHasNullField = false;
            // store the position in case of deserialization
            int s1 = bb1.position();
            int s2 = bb2.position();
            int rc = 0;
            byte tupleType1 = bb1.get();
            byte tupleType2 = bb2.get();
            assert (tupleType1 == tupleType2 && tupleType1 == DataType.TUPLE);
            // first compare sizes
            int sz1 = bb1.getInt();
            int sz2 = bb2.getInt();
            if (sz1 > sz2) {
                return 1;
            } else if (sz1 < sz2) {
                return -1;
            } else {
                // if sizes are the same, compare field by field
                for (int i = 0; i < sz1 && rc == 0; i++) {
                    byte dt1 = bb1.get();
                    byte dt2 = bb2.get();
                    if (dt1 == dt2) {
                        switch (dt1) {
                        case DataType.NULL:
                            if (topLevel) // we are scanning the top-level Tuple (original call)
                                mHasNullField = true;
                            rc = 0;
                            break;
                        case DataType.BOOLEAN:
                        case DataType.BYTE:
                            byte bv1 = bb1.get();
                            byte bv2 = bb2.get();
                            rc = (bv1 < bv2 ? -1 : (bv1 == bv2 ? 0 : 1));
                            break;
                        case DataType.INTEGER:
                            int iv1 = bb1.getInt();
                            int iv2 = bb2.getInt();
                            rc = (iv1 < iv2 ? -1 : (iv1 == iv2 ? 0 : 1));
                            break;
                        case DataType.LONG:
                            long lv1 = bb1.getLong();
                            long lv2 = bb2.getLong();
                            rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
                            break;
                        case DataType.FLOAT:
                            float fv1 = bb1.getFloat();
                            float fv2 = bb2.getFloat();
                            rc = Float.compare(fv1, fv2);
                            break;
                        case DataType.DOUBLE:
                            double dv1 = bb1.getDouble();
                            double dv2 = bb2.getDouble();
                            rc = Double.compare(dv1, dv2);
                            break;
                        case DataType.BIGINTEGER: {
                            if (bb1.get() != DataType.BYTEARRAY || bb2.get() != DataType.BYTEARRAY) {
                                throw new RuntimeException("Issue in comparing raw bytes for DefaultTuple! BIGINTEGER was not serialized with BYTEARRAY");
                            }

                            int basz1 = bb1.getInt();
                            int basz2 = bb2.getInt();
                            byte[] ba1 = new byte[basz1];
                            byte[] ba2 = new byte[basz2];
                            bb1.get(ba1);
                            bb2.get(ba2);
                            rc = new BigInteger(ba1).compareTo(new BigInteger(ba2));
                            break;
                        }
                        case DataType.BIGDECIMAL: {
                            byte catype1 = bb1.get();
                            byte catype2 = bb2.get();
                            int casz1 = (catype1 == DataType.CHARARRAY) ? bb1.getShort() : bb1.getInt();
                            int casz2 = (catype2 == DataType.CHARARRAY) ? bb2.getShort() : bb2.getInt();
                            byte[] ca1 = new byte[casz1];
                            byte[] ca2 = new byte[casz2];
                            bb1.get(ca1);
                            bb2.get(ca2);
                            String str1 = null,
                            str2 = null;
                            try {
                                str1 = new String(ca1, DataReaderWriter.UTF8);
                                str2 = new String(ca2, DataReaderWriter.UTF8);
                            } catch (UnsupportedEncodingException uee) {
                                mLog.warn("Unsupported string encoding", uee);
                                uee.printStackTrace();
                            }
                            if (str1 != null && str2 != null)
                                rc = new BigDecimal(str1).compareTo(new BigDecimal(str2));
                            break;
                        }
                        case DataType.DATETIME:
                            long dtv1 = bb1.getLong();
                            bb1.position(bb1.position() + 2); // move cursor forward without read the timezone bytes
                            long dtv2 = bb2.getLong();
                            bb2.position(bb2.position() + 2);
                            rc = (dtv1 < dtv2 ? -1 : (dtv1 == dtv2 ? 0 : 1));
                            break;
                        case DataType.BYTEARRAY:
                            int basz1 = bb1.getInt();
                            int basz2 = bb2.getInt();
                            byte[] ba1 = new byte[basz1];
                            byte[] ba2 = new byte[basz2];
                            bb1.get(ba1);
                            bb2.get(ba2);
                            rc = DataByteArray.compare(ba1, ba2);
                            break;
                        case DataType.CHARARRAY:
                        case DataType.BIGCHARARRAY:
                            int casz1 = (dt1 == DataType.CHARARRAY) ? bb1.getShort() : bb1.getInt();
                            int casz2 = (dt1 == DataType.CHARARRAY) ? bb2.getShort() : bb2.getInt();
                            byte[] ca1 = new byte[casz1];
                            byte[] ca2 = new byte[casz2];
                            bb1.get(ca1);
                            bb2.get(ca2);
                            String str1 = null,
                            str2 = null;
                            try {
                                str1 = new String(ca1, DataReaderWriter.UTF8);
                                str2 = new String(ca2, DataReaderWriter.UTF8);
                            } catch (UnsupportedEncodingException uee) {
                                mLog.warn("Unsupported string encoding", uee);
                                uee.printStackTrace();
                            }
                            if (str1 != null && str2 != null)
                                rc = str1.compareTo(str2);
                            break;
                        case DataType.TUPLE:
                            // put back the cursor to before DataType.TUPLE
                            bb1.position(bb1.position() - 1);
                            bb2.position(bb2.position() - 1);
                            rc = compareDefaultTuple(bb1, bb2, false);
                            break;
                        default:
                            mLog.info("Unsupported DataType for binary comparison, switching to object deserialization: "
                                    + DataType.genTypeToNameMap().get(dt1) + "(" + dt1 + ")");
                            Tuple t1 = mFact.newTuple();
                            Tuple t2 = mFact.newTuple();
                            try {
                                t1.readFields(new DataInputStream(
                                        new ByteArrayInputStream(bb1.array(), s1, bb1.limit())));
                                t2.readFields(new DataInputStream(
                                        new ByteArrayInputStream(bb2.array(), s2, bb2.limit())));
                            } catch (IOException ioe) {
                                mLog.error("Unable to instantiate tuples for comparison: " + ioe.getMessage());
                                throw new RuntimeException(ioe.getMessage(), ioe);
                            }
                            // delegate to compareTuple
                            return compareTuple(t1, t2);
                        }
                    } else { // compare DataTypes
                        if (dt1 < dt2)
                            rc = -1;
                        else
                            rc = 1;
                    }
                    // flip if the order is descending
                    if (rc != 0) {
                        if (!mWholeTuple && !mAsc[i])
                            rc *= -1;
                        else if (mWholeTuple && !mAsc[0])
                            rc *= -1;
                    }
                }
            }
            return rc;
        }