in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java [108:301]
public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock> tsBlocks)
throws IOException {
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
// one time column and each value column has an actual value buffer and a bitmap value to
// indicate whether it is a null
int columnNum = 1;
int columnNumWithTime = columnNum * 2 + 1;
DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
for (int i = 0; i < columnNumWithTime; i++) {
byteArrayOutputStreams[i] = new ByteArrayOutputStream();
dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
}
int rowCount = 0;
int[] valueOccupation = new int[columnNum];
// used to record a bitmap for every 8 points
int[] bitmaps = new int[columnNum];
for (TsBlock tsBlock : tsBlocks) {
if (tsBlock.isEmpty()) {
continue;
}
int currentCount = tsBlock.getPositionCount();
// serialize time column
for (int i = 0; i < currentCount; i++) {
// use columnOutput to write byte array
dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
}
// serialize each value column and its bitmap
for (int k = 0; k < columnNum; k++) {
// get DataOutputStream for current value column and its bitmap
DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
Column column = tsBlock.getColumn(k);
TSDataType type = column.getDataType();
switch (type) {
case INT32:
case DATE:
for (int i = 0; i < currentCount; i++) {
rowCount++;
if (column.isNull(i)) {
bitmaps[k] = bitmaps[k] << 1;
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeInt(column.getInt(i));
valueOccupation[k] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
// we should clear the bitmap every 8 points
bitmaps[k] = 0;
}
}
break;
case INT64:
case TIMESTAMP:
for (int i = 0; i < currentCount; i++) {
rowCount++;
if (column.isNull(i)) {
bitmaps[k] = bitmaps[k] << 1;
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeLong(column.getLong(i));
valueOccupation[k] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
// we should clear the bitmap every 8 points
bitmaps[k] = 0;
}
}
break;
case FLOAT:
for (int i = 0; i < currentCount; i++) {
rowCount++;
if (column.isNull(i)) {
bitmaps[k] = bitmaps[k] << 1;
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeFloat(column.getFloat(i));
valueOccupation[k] += 4;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
// we should clear the bitmap every 8 points
bitmaps[k] = 0;
}
}
break;
case DOUBLE:
for (int i = 0; i < currentCount; i++) {
rowCount++;
if (column.isNull(i)) {
bitmaps[k] = bitmaps[k] << 1;
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeDouble(column.getDouble(i));
valueOccupation[k] += 8;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
// we should clear the bitmap every 8 points
bitmaps[k] = 0;
}
}
break;
case BOOLEAN:
for (int i = 0; i < currentCount; i++) {
rowCount++;
if (column.isNull(i)) {
bitmaps[k] = bitmaps[k] << 1;
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
dataOutputStream.writeBoolean(column.getBoolean(i));
valueOccupation[k] += 1;
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
// we should clear the bitmap every 8 points
bitmaps[k] = 0;
}
}
break;
case TEXT:
case BLOB:
case STRING:
for (int i = 0; i < currentCount; i++) {
rowCount++;
if (column.isNull(i)) {
bitmaps[k] = bitmaps[k] << 1;
} else {
bitmaps[k] = (bitmaps[k] << 1) | FLAG;
Binary binary = column.getBinary(i);
dataOutputStream.writeInt(binary.getLength());
dataOutputStream.write(binary.getValues());
valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
}
if (rowCount != 0 && rowCount % 8 == 0) {
dataBitmapOutputStream.writeByte(bitmaps[k]);
// we should clear the bitmap every 8 points
bitmaps[k] = 0;
}
}
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", type));
}
if (k != columnNum - 1) {
rowCount -= currentCount;
}
}
}
// feed the remaining bitmap
int remaining = rowCount % 8;
for (int k = 0; k < columnNum; k++) {
if (remaining != 0) {
DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
}
}
// calculate the time buffer size
int timeOccupation = rowCount * 8;
ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
timeBuffer.flip();
tsQueryDataSet.setTime(timeBuffer);
// calculate the bitmap buffer size
int bitmapOccupation = (rowCount + 7) / 8;
List<ByteBuffer> bitmapList = new LinkedList<>();
List<ByteBuffer> valueList = new LinkedList<>();
for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
valueBuffer.flip();
valueList.add(valueBuffer);
ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
bitmapBuffer.flip();
bitmapList.add(bitmapBuffer);
}
tsQueryDataSet.setBitmapList(bitmapList);
tsQueryDataSet.setValueList(valueList);
return tsQueryDataSet;
}