public static TSQueryDataSet convertTsBlockByFetchSize()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java [108:301]


  public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock> tsBlocks)
      throws IOException {
    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();

    // one time column and each value column has an actual value buffer and a bitmap value to
    // indicate whether it is a null
    int columnNum = 1;
    int columnNumWithTime = columnNum * 2 + 1;
    DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime];
    ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime];
    for (int i = 0; i < columnNumWithTime; i++) {
      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
    }

    int rowCount = 0;
    int[] valueOccupation = new int[columnNum];

    // used to record a bitmap for every 8 points
    int[] bitmaps = new int[columnNum];
    for (TsBlock tsBlock : tsBlocks) {
      if (tsBlock.isEmpty()) {
        continue;
      }

      int currentCount = tsBlock.getPositionCount();
      // serialize time column
      for (int i = 0; i < currentCount; i++) {
        // use columnOutput to write byte array
        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
      }

      // serialize each value column and its bitmap
      for (int k = 0; k < columnNum; k++) {
        // get DataOutputStream for current value column and its bitmap
        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];

        Column column = tsBlock.getColumn(k);
        TSDataType type = column.getDataType();
        switch (type) {
          case INT32:
          case DATE:
            for (int i = 0; i < currentCount; i++) {
              rowCount++;
              if (column.isNull(i)) {
                bitmaps[k] = bitmaps[k] << 1;
              } else {
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
                dataOutputStream.writeInt(column.getInt(i));
                valueOccupation[k] += 4;
              }
              if (rowCount != 0 && rowCount % 8 == 0) {
                dataBitmapOutputStream.writeByte(bitmaps[k]);
                // we should clear the bitmap every 8 points
                bitmaps[k] = 0;
              }
            }
            break;
          case INT64:
          case TIMESTAMP:
            for (int i = 0; i < currentCount; i++) {
              rowCount++;
              if (column.isNull(i)) {
                bitmaps[k] = bitmaps[k] << 1;
              } else {
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
                dataOutputStream.writeLong(column.getLong(i));
                valueOccupation[k] += 8;
              }
              if (rowCount != 0 && rowCount % 8 == 0) {
                dataBitmapOutputStream.writeByte(bitmaps[k]);
                // we should clear the bitmap every 8 points
                bitmaps[k] = 0;
              }
            }
            break;
          case FLOAT:
            for (int i = 0; i < currentCount; i++) {
              rowCount++;
              if (column.isNull(i)) {
                bitmaps[k] = bitmaps[k] << 1;
              } else {
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
                dataOutputStream.writeFloat(column.getFloat(i));
                valueOccupation[k] += 4;
              }
              if (rowCount != 0 && rowCount % 8 == 0) {
                dataBitmapOutputStream.writeByte(bitmaps[k]);
                // we should clear the bitmap every 8 points
                bitmaps[k] = 0;
              }
            }
            break;
          case DOUBLE:
            for (int i = 0; i < currentCount; i++) {
              rowCount++;
              if (column.isNull(i)) {
                bitmaps[k] = bitmaps[k] << 1;
              } else {
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
                dataOutputStream.writeDouble(column.getDouble(i));
                valueOccupation[k] += 8;
              }
              if (rowCount != 0 && rowCount % 8 == 0) {
                dataBitmapOutputStream.writeByte(bitmaps[k]);
                // we should clear the bitmap every 8 points
                bitmaps[k] = 0;
              }
            }
            break;
          case BOOLEAN:
            for (int i = 0; i < currentCount; i++) {
              rowCount++;
              if (column.isNull(i)) {
                bitmaps[k] = bitmaps[k] << 1;
              } else {
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
                dataOutputStream.writeBoolean(column.getBoolean(i));
                valueOccupation[k] += 1;
              }
              if (rowCount != 0 && rowCount % 8 == 0) {
                dataBitmapOutputStream.writeByte(bitmaps[k]);
                // we should clear the bitmap every 8 points
                bitmaps[k] = 0;
              }
            }
            break;
          case TEXT:
          case BLOB:
          case STRING:
            for (int i = 0; i < currentCount; i++) {
              rowCount++;
              if (column.isNull(i)) {
                bitmaps[k] = bitmaps[k] << 1;
              } else {
                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
                Binary binary = column.getBinary(i);
                dataOutputStream.writeInt(binary.getLength());
                dataOutputStream.write(binary.getValues());
                valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength();
              }
              if (rowCount != 0 && rowCount % 8 == 0) {
                dataBitmapOutputStream.writeByte(bitmaps[k]);
                // we should clear the bitmap every 8 points
                bitmaps[k] = 0;
              }
            }
            break;
          default:
            throw new UnSupportedDataTypeException(
                String.format("Data type %s is not supported.", type));
        }
        if (k != columnNum - 1) {
          rowCount -= currentCount;
        }
      }
    }
    // feed the remaining bitmap
    int remaining = rowCount % 8;
    for (int k = 0; k < columnNum; k++) {
      if (remaining != 0) {
        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
        dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
      }
    }

    // calculate the time buffer size
    int timeOccupation = rowCount * 8;
    ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
    timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
    timeBuffer.flip();
    tsQueryDataSet.setTime(timeBuffer);

    // calculate the bitmap buffer size
    int bitmapOccupation = (rowCount + 7) / 8;

    List<ByteBuffer> bitmapList = new LinkedList<>();
    List<ByteBuffer> valueList = new LinkedList<>();
    for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]);
      valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
      valueBuffer.flip();
      valueList.add(valueBuffer);

      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
      bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
      bitmapBuffer.flip();
      bitmapList.add(bitmapBuffer);
    }
    tsQueryDataSet.setBitmapList(bitmapList);
    tsQueryDataSet.setValueList(valueList);
    return tsQueryDataSet;
  }