in tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java [587:706]
public void addTuple(Tuple t) throws IOException {
if (buffer.remaining() < headerSize) {
flushBuffer();
}
// skip the row header
int recordOffset = buffer.position();
buffer.position(recordOffset + headerSize);
// reset the null flags
nullFlags.clear();
for (int i = 0; i < schema.size(); i++) {
if (enabledStats) {
stats.analyzeField(i, t.get(i));
}
if (t.isNull(i)) {
nullFlags.set(i);
continue;
}
// 8 is the maximum bytes size of all types
if (flushBufferAndReplace(recordOffset, 8)) {
recordOffset = 0;
}
switch(columnTypes[i].getType()) {
case NULL_TYPE:
nullFlags.set(i);
continue;
case BOOLEAN:
case BIT:
buffer.put(t.getByte(i));
break;
case INT2 :
buffer.putShort(t.getInt2(i));
break;
case INT4 :
writeRawVarint32(encodeZigZag32(t.getInt4(i)));
break;
case INT8 :
writeRawVarint64(encodeZigZag64(t.getInt8(i)));
break;
case FLOAT4 :
buffer.putFloat(t.getFloat4(i));
break;
case FLOAT8 :
buffer.putDouble(t.getFloat8(i));
break;
case CHAR:
case TEXT: {
byte [] strBytes = t.getBytes(i);
if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) {
recordOffset = 0;
}
writeRawVarint32(strBytes.length);
buffer.put(strBytes);
break;
}
case DATE:
buffer.putInt(t.getInt4(i));
break;
case TIME:
case TIMESTAMP:
buffer.putLong(t.getInt8(i));
break;
case BLOB : {
byte [] rawBytes = t.getBytes(i);
if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
recordOffset = 0;
}
writeRawVarint32(rawBytes.length);
buffer.put(rawBytes);
break;
}
case PROTOBUF: {
byte [] rawBytes = t.getBytes(i);
if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) {
recordOffset = 0;
}
writeRawVarint32(rawBytes.length);
buffer.put(rawBytes);
break;
}
case INET4 :
buffer.put(t.getBytes(i));
break;
default:
throw new IOException("Cannot support data type: " + columnTypes[i].getType());
}
}
// write a record header
int bufferPos = buffer.position();
buffer.position(recordOffset);
buffer.putInt(bufferPos - recordOffset);
byte [] flags = nullFlags.toArray();
buffer.putShort((short) flags.length);
buffer.put(flags);
pos += bufferPos - recordOffset;
buffer.position(bufferPos);
if (enabledStats) {
stats.incrementRow();
}
}