in src/org/apache/pig/data/DataReaderWriter.java [222:366]
public static void writeDatum(
DataOutput out,
Object val) throws IOException {
// Read the data type
byte type = DataType.findType(val);
switch (type) {
case DataType.TUPLE:
Tuple t = (Tuple)val;
out.writeByte(DataType.TUPLE);
int sz = t.size();
out.writeInt(sz);
for (int i = 0; i < sz; i++) {
DataReaderWriter.writeDatum(out, t.get(i));
}
break;
case DataType.BAG:
DataBag bag = (DataBag)val;
out.writeByte(DataType.BAG);
out.writeLong(bag.size());
Iterator<Tuple> it = bag.iterator();
while (it.hasNext()) {
DataReaderWriter.writeDatum(out, it.next());
}
break;
case DataType.MAP: {
out.writeByte(DataType.MAP);
Map<String, Object> m = (Map<String, Object>)val;
out.writeInt(m.size());
Iterator<Map.Entry<String, Object> > i =
m.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<String, Object> entry = i.next();
writeDatum(out, entry.getKey());
writeDatum(out, entry.getValue());
}
break;
}
case DataType.INTERNALMAP: {
out.writeByte(DataType.INTERNALMAP);
Map<Object, Object> m = (Map<Object, Object>)val;
out.writeInt(m.size());
Iterator<Map.Entry<Object, Object> > i =
m.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<Object, Object> entry = i.next();
writeDatum(out, entry.getKey());
writeDatum(out, entry.getValue());
}
break;
}
case DataType.INTEGER:
out.writeByte(DataType.INTEGER);
out.writeInt((Integer)val);
break;
case DataType.LONG:
out.writeByte(DataType.LONG);
out.writeLong((Long)val);
break;
case DataType.FLOAT:
out.writeByte(DataType.FLOAT);
out.writeFloat((Float)val);
break;
case DataType.DOUBLE:
out.writeByte(DataType.DOUBLE);
out.writeDouble((Double)val);
break;
case DataType.BOOLEAN:
out.writeByte(DataType.BOOLEAN);
out.writeBoolean((Boolean)val);
break;
case DataType.BYTE:
out.writeByte(DataType.BYTE);
out.writeByte((Byte)val);
break;
case DataType.DATETIME:
out.writeByte(DataType.DATETIME);
out.writeLong(((DateTime)val).getMillis());
out.writeShort(((DateTime)val).getZone().getOffset((DateTime)val) / 60000);
break;
case DataType.BYTEARRAY: {
out.writeByte(DataType.BYTEARRAY);
DataByteArray bytes = (DataByteArray)val;
out.writeInt(bytes.size());
out.write(bytes.mData);
break;
}
case DataType.BIGINTEGER:
out.writeByte(DataType.BIGINTEGER);
byte[] bytes = ((BigInteger)val).toByteArray();
out.writeInt(bytes.length);
out.write(bytes);
break;
case DataType.BIGDECIMAL:
out.writeByte(DataType.BIGDECIMAL);
byte[] bt = ((BigDecimal)val).toString().getBytes(DataReaderWriter.UTF8);
out.writeInt(bt.length);
out.write(bt);
break;
case DataType.CHARARRAY: {
String s = (String)val;
byte[] utfBytes = s.getBytes(DataReaderWriter.UTF8);
int length = utfBytes.length;
if(length < DataReaderWriter.UNSIGNED_SHORT_MAX) {
out.writeByte(DataType.CHARARRAY);
out.writeShort(length);
out.write(utfBytes);
} else {
out.writeByte(DataType.BIGCHARARRAY);
out.writeInt(length);
out.write(utfBytes);
}
break;
}
case DataType.GENERIC_WRITABLECOMPARABLE :
out.writeByte(DataType.GENERIC_WRITABLECOMPARABLE);
//store the class name, so we know the class to create on read
writeDatum(out, val.getClass().getName());
Writable writable = (Writable)val;
writable.write(out);
break;
case DataType.NULL:
out.writeByte(DataType.NULL);
break;
default:
throw new RuntimeException("Unexpected data type " + type +
" found in stream.");
}
}