public TsBlock nextBatch()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java [1748:1909]


    public TsBlock nextBatch() {
      TsBlockBuilder builder = new TsBlockBuilder(dataTypeList);
      // Time column
      TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();

      int validRowCount = 0;
      // duplicated time or deleted time are all invalid, true if we don't need this row
      BitMap timeInvalidInfo = null;

      int[] deleteCursor = {0};
      int startIndex = index;
      // time column
      for (; index < rows; index++) {
        if (validRowCount >= maxNumberOfPointsInPage) {
          break;
        }
        // skip empty row
        if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(getValueIndex(index))) {
          continue;
        }
        if (isTimeDeleted(index)) {
          continue;
        }
        int nextRowIndex = index + 1;
        while (nextRowIndex < rows
            && ((allValueColDeletedMap != null
                    && allValueColDeletedMap.isMarked(getValueIndex(nextRowIndex)))
                || (isTimeDeleted(nextRowIndex)))) {
          nextRowIndex++;
        }
        long time = getTime(index);
        if ((nextRowIndex == rows || time != getTime(nextRowIndex))
            && !isPointDeleted(time, timeColumnDeletion, deleteCursor)) {
          timeBuilder.writeLong(time);
          validRowCount++;
        } else {
          if (Objects.isNull(timeInvalidInfo)) {
            timeInvalidInfo = new BitMap(rows);
          }
          timeInvalidInfo.mark(index);
        }
        index = nextRowIndex - 1;
      }

      boolean[] hasAnyNonNullValue = new boolean[validRowCount];
      int columnCount = dataTypeList.size();
      int currentWriteRowIndex;
      // value columns
      for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
        int validColumnIndex = columnIndexList.get(columnIndex);

        deleteCursor = new int[] {0};
        // Pair of Time and Index
        Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
        if (Objects.nonNull(timeInvalidInfo)) {
          lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, null);
        }
        ColumnBuilder valueBuilder = builder.getColumnBuilder(columnIndex);
        currentWriteRowIndex = 0;
        for (int sortedRowIndex = startIndex; sortedRowIndex < index; sortedRowIndex++) {
          // skip empty row
          if ((allValueColDeletedMap != null
                  && allValueColDeletedMap.isMarked(getValueIndex(sortedRowIndex)))
              || (isTimeDeleted(sortedRowIndex))) {
            continue;
          }
          // skip time duplicated or totally deleted rows
          if (Objects.nonNull(timeInvalidInfo)) {
            if (!outer.isNullValue(getValueIndex(sortedRowIndex), validColumnIndex)) {
              lastValidPointIndexForTimeDupCheck.left = getTime(sortedRowIndex);
              lastValidPointIndexForTimeDupCheck.right = getValueIndex(sortedRowIndex);
            }
            if (timeInvalidInfo.isMarked(sortedRowIndex)) {
              continue;
            }
          }

          // append null value when query column does not exist in current aligned TVList
          if (validColumnIndex < 0 || validColumnIndex >= dataTypes.size()) {
            valueBuilder.appendNull();
            currentWriteRowIndex++;
            continue;
          }

          // The part of code solves the following problem:
          // Time: 1,2,2,3
          // Value: 1,2,null,null
          // When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1)
          // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value
          // When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2==pair.left:2, write(T:2,V:2)
          // When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2,
          // write(T:3,V:null)
          int originRowIndex;
          if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
              && (getTime(sortedRowIndex) == lastValidPointIndexForTimeDupCheck.left)) {
            originRowIndex = lastValidPointIndexForTimeDupCheck.right;
          } else {
            originRowIndex = getValueIndex(sortedRowIndex);
          }
          if (outer.isNullValue(originRowIndex, validColumnIndex)
              || isPointDeleted(
                  getTime(sortedRowIndex),
                  Objects.isNull(valueColumnsDeletionList)
                      ? null
                      : valueColumnsDeletionList.get(columnIndex),
                  deleteCursor)) {
            valueBuilder.appendNull();
            currentWriteRowIndex++;
            continue;
          }
          hasAnyNonNullValue[currentWriteRowIndex++] = true;
          switch (dataTypes.get(validColumnIndex)) {
            case BOOLEAN:
              valueBuilder.writeBoolean(getBooleanByValueIndex(originRowIndex, validColumnIndex));
              break;
            case INT32:
            case DATE:
              valueBuilder.writeInt(getIntByValueIndex(originRowIndex, validColumnIndex));
              break;
            case INT64:
            case TIMESTAMP:
              valueBuilder.writeLong(getLongByValueIndex(originRowIndex, validColumnIndex));
              break;
            case FLOAT:
              float valueF = getFloatByValueIndex(originRowIndex, validColumnIndex);
              if (encodingList != null) {
                valueF =
                    roundValueWithGivenPrecision(
                        valueF, floatPrecision, encodingList.get(columnIndex));
              }
              valueBuilder.writeFloat(valueF);
              break;
            case DOUBLE:
              double valueD = getDoubleByValueIndex(originRowIndex, validColumnIndex);
              if (encodingList != null) {
                valueD =
                    roundValueWithGivenPrecision(
                        valueD, floatPrecision, encodingList.get(columnIndex));
              }
              valueBuilder.writeDouble(valueD);
              break;
            case TEXT:
            case BLOB:
            case STRING:
              valueBuilder.writeBinary(getBinaryByValueIndex(originRowIndex, validColumnIndex));
              break;
            default:
              break;
          }
        }
      }
      builder.declarePositions(validRowCount);
      TsBlock tsBlock = builder.build();
      if (ignoreAllNullRows && needRebuildTsBlock(hasAnyNonNullValue)) {
        // if exist all null rows, at most have validRowCount - 1 valid rows
        tsBlock = reBuildTsBlock(hasAnyNonNullValue, validRowCount, dataTypeList, tsBlock);
      }
      tsBlocks.add(tsBlock);

      probeNext = false;
      return tsBlock;
    }