in tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java [228:363]
public Tuple next() throws IOException {
if(eof) return null;
if (buffer.remaining() < headerSize) {
if (!fillBuffer()) {
return null;
}
}
// backup the buffer state
int bufferLimit = buffer.limit();
int recordSize = buffer.getInt();
int nullFlagSize = buffer.getShort();
buffer.limit(buffer.position() + nullFlagSize);
nullFlags.fromByteBuffer(buffer);
// restore the start of record contents
buffer.limit(bufferLimit);
//buffer.position(recordOffset + headerSize);
if (buffer.remaining() < (recordSize - headerSize)) {
if (!fillBuffer()) {
return null;
}
}
recordCount++;
for (int i = 0; i < columnTypes.length; i++) {
// check if the i'th column is null
if (nullFlags.get(i)) {
tuple.put(i, DatumFactory.createNullDatum());
continue;
}
switch (columnTypes[i].getType()) {
case BOOLEAN :
tuple.put(i, DatumFactory.createBool(buffer.get()));
break;
case BIT :
tuple.put(i, DatumFactory.createBit(buffer.get()));
break;
case CHAR :
int realLen = readRawVarint32();
byte[] buf = new byte[realLen];
buffer.get(buf);
tuple.put(i, DatumFactory.createChar(buf));
break;
case INT2 :
tuple.put(i, DatumFactory.createInt2(buffer.getShort()));
break;
case INT4 :
tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
break;
case INT8 :
tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
break;
case FLOAT4 :
tuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
break;
case FLOAT8 :
tuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
break;
case TEXT : {
int len = readRawVarint32();
byte [] strBytes = new byte[len];
buffer.get(strBytes);
tuple.put(i, DatumFactory.createText(new String(strBytes)));
break;
}
case BLOB : {
int len = readRawVarint32();
byte [] rawBytes = new byte[len];
buffer.get(rawBytes);
tuple.put(i, DatumFactory.createBlob(rawBytes));
break;
}
case PROTOBUF: {
int len = readRawVarint32();
byte [] rawBytes = new byte[len];
buffer.get(rawBytes);
ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]);
Message.Builder builder = factory.newBuilder();
builder.mergeFrom(rawBytes);
tuple.put(i, factory.createDatum(builder.build()));
break;
}
case INET4 :
byte [] ipv4Bytes = new byte[4];
buffer.get(ipv4Bytes);
tuple.put(i, DatumFactory.createInet4(ipv4Bytes));
break;
case DATE: {
int val = buffer.getInt();
if (val < Integer.MIN_VALUE + 1) {
tuple.put(i, DatumFactory.createNullDatum());
} else {
tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val));
}
break;
}
case TIME:
case TIMESTAMP: {
long val = buffer.getLong();
if (val < Long.MIN_VALUE + 1) {
tuple.put(i, DatumFactory.createNullDatum());
} else {
tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val));
}
break;
}
case NULL_TYPE:
tuple.put(i, NullDatum.get());
break;
default:
}
}
if(!buffer.hasRemaining() && channel.position() == fileSize){
eof = true;
}
return new VTuple(tuple);
}