in src/org/apache/pig/data/DefaultTuple.java [267:432]
private int compareDefaultTuple(ByteBuffer bb1, ByteBuffer bb2, boolean topLevel) {
mHasNullField = false;
// store the position in case of deserialization
int s1 = bb1.position();
int s2 = bb2.position();
int rc = 0;
byte tupleType1 = bb1.get();
byte tupleType2 = bb2.get();
assert (tupleType1 == tupleType2 && tupleType1 == DataType.TUPLE);
// first compare sizes
int sz1 = bb1.getInt();
int sz2 = bb2.getInt();
if (sz1 > sz2) {
return 1;
} else if (sz1 < sz2) {
return -1;
} else {
// if sizes are the same, compare field by field
for (int i = 0; i < sz1 && rc == 0; i++) {
byte dt1 = bb1.get();
byte dt2 = bb2.get();
if (dt1 == dt2) {
switch (dt1) {
case DataType.NULL:
if (topLevel) // we are scanning the top-level Tuple (original call)
mHasNullField = true;
rc = 0;
break;
case DataType.BOOLEAN:
case DataType.BYTE:
byte bv1 = bb1.get();
byte bv2 = bb2.get();
rc = (bv1 < bv2 ? -1 : (bv1 == bv2 ? 0 : 1));
break;
case DataType.INTEGER:
int iv1 = bb1.getInt();
int iv2 = bb2.getInt();
rc = (iv1 < iv2 ? -1 : (iv1 == iv2 ? 0 : 1));
break;
case DataType.LONG:
long lv1 = bb1.getLong();
long lv2 = bb2.getLong();
rc = (lv1 < lv2 ? -1 : (lv1 == lv2 ? 0 : 1));
break;
case DataType.FLOAT:
float fv1 = bb1.getFloat();
float fv2 = bb2.getFloat();
rc = Float.compare(fv1, fv2);
break;
case DataType.DOUBLE:
double dv1 = bb1.getDouble();
double dv2 = bb2.getDouble();
rc = Double.compare(dv1, dv2);
break;
case DataType.BIGINTEGER: {
if (bb1.get() != DataType.BYTEARRAY || bb2.get() != DataType.BYTEARRAY) {
throw new RuntimeException("Issue in comparing raw bytes for DefaultTuple! BIGINTEGER was not serialized with BYTEARRAY");
}
int basz1 = bb1.getInt();
int basz2 = bb2.getInt();
byte[] ba1 = new byte[basz1];
byte[] ba2 = new byte[basz2];
bb1.get(ba1);
bb2.get(ba2);
rc = new BigInteger(ba1).compareTo(new BigInteger(ba2));
break;
}
case DataType.BIGDECIMAL: {
byte catype1 = bb1.get();
byte catype2 = bb2.get();
int casz1 = (catype1 == DataType.CHARARRAY) ? bb1.getShort() : bb1.getInt();
int casz2 = (catype2 == DataType.CHARARRAY) ? bb2.getShort() : bb2.getInt();
byte[] ca1 = new byte[casz1];
byte[] ca2 = new byte[casz2];
bb1.get(ca1);
bb2.get(ca2);
String str1 = null,
str2 = null;
try {
str1 = new String(ca1, DataReaderWriter.UTF8);
str2 = new String(ca2, DataReaderWriter.UTF8);
} catch (UnsupportedEncodingException uee) {
mLog.warn("Unsupported string encoding", uee);
uee.printStackTrace();
}
if (str1 != null && str2 != null)
rc = new BigDecimal(str1).compareTo(new BigDecimal(str2));
break;
}
case DataType.DATETIME:
long dtv1 = bb1.getLong();
bb1.position(bb1.position() + 2); // move cursor forward without read the timezone bytes
long dtv2 = bb2.getLong();
bb2.position(bb2.position() + 2);
rc = (dtv1 < dtv2 ? -1 : (dtv1 == dtv2 ? 0 : 1));
break;
case DataType.BYTEARRAY:
int basz1 = bb1.getInt();
int basz2 = bb2.getInt();
byte[] ba1 = new byte[basz1];
byte[] ba2 = new byte[basz2];
bb1.get(ba1);
bb2.get(ba2);
rc = DataByteArray.compare(ba1, ba2);
break;
case DataType.CHARARRAY:
case DataType.BIGCHARARRAY:
int casz1 = (dt1 == DataType.CHARARRAY) ? bb1.getShort() : bb1.getInt();
int casz2 = (dt1 == DataType.CHARARRAY) ? bb2.getShort() : bb2.getInt();
byte[] ca1 = new byte[casz1];
byte[] ca2 = new byte[casz2];
bb1.get(ca1);
bb2.get(ca2);
String str1 = null,
str2 = null;
try {
str1 = new String(ca1, DataReaderWriter.UTF8);
str2 = new String(ca2, DataReaderWriter.UTF8);
} catch (UnsupportedEncodingException uee) {
mLog.warn("Unsupported string encoding", uee);
uee.printStackTrace();
}
if (str1 != null && str2 != null)
rc = str1.compareTo(str2);
break;
case DataType.TUPLE:
// put back the cursor to before DataType.TUPLE
bb1.position(bb1.position() - 1);
bb2.position(bb2.position() - 1);
rc = compareDefaultTuple(bb1, bb2, false);
break;
default:
mLog.info("Unsupported DataType for binary comparison, switching to object deserialization: "
+ DataType.genTypeToNameMap().get(dt1) + "(" + dt1 + ")");
Tuple t1 = mFact.newTuple();
Tuple t2 = mFact.newTuple();
try {
t1.readFields(new DataInputStream(
new ByteArrayInputStream(bb1.array(), s1, bb1.limit())));
t2.readFields(new DataInputStream(
new ByteArrayInputStream(bb2.array(), s2, bb2.limit())));
} catch (IOException ioe) {
mLog.error("Unable to instantiate tuples for comparison: " + ioe.getMessage());
throw new RuntimeException(ioe.getMessage(), ioe);
}
// delegate to compareTuple
return compareTuple(t1, t2);
}
} else { // compare DataTypes
if (dt1 < dt2)
rc = -1;
else
rc = 1;
}
// flip if the order is descending
if (rc != 0) {
if (!mWholeTuple && !mAsc[i])
rc *= -1;
else if (mWholeTuple && !mAsc[0])
rc *= -1;
}
}
}
return rc;
}