in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java [1748:1909]
public TsBlock nextBatch() {
TsBlockBuilder builder = new TsBlockBuilder(dataTypeList);
// Time column
TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
int validRowCount = 0;
// duplicated time or deleted time are all invalid, true if we don't need this row
BitMap timeInvalidInfo = null;
int[] deleteCursor = {0};
int startIndex = index;
// time column
for (; index < rows; index++) {
if (validRowCount >= maxNumberOfPointsInPage) {
break;
}
// skip empty row
if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(getValueIndex(index))) {
continue;
}
if (isTimeDeleted(index)) {
continue;
}
int nextRowIndex = index + 1;
while (nextRowIndex < rows
&& ((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(getValueIndex(nextRowIndex)))
|| (isTimeDeleted(nextRowIndex)))) {
nextRowIndex++;
}
long time = getTime(index);
if ((nextRowIndex == rows || time != getTime(nextRowIndex))
&& !isPointDeleted(time, timeColumnDeletion, deleteCursor)) {
timeBuilder.writeLong(time);
validRowCount++;
} else {
if (Objects.isNull(timeInvalidInfo)) {
timeInvalidInfo = new BitMap(rows);
}
timeInvalidInfo.mark(index);
}
index = nextRowIndex - 1;
}
boolean[] hasAnyNonNullValue = new boolean[validRowCount];
int columnCount = dataTypeList.size();
int currentWriteRowIndex;
// value columns
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
int validColumnIndex = columnIndexList.get(columnIndex);
deleteCursor = new int[] {0};
// Pair of Time and Index
Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
if (Objects.nonNull(timeInvalidInfo)) {
lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, null);
}
ColumnBuilder valueBuilder = builder.getColumnBuilder(columnIndex);
currentWriteRowIndex = 0;
for (int sortedRowIndex = startIndex; sortedRowIndex < index; sortedRowIndex++) {
// skip empty row
if ((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(getValueIndex(sortedRowIndex)))
|| (isTimeDeleted(sortedRowIndex))) {
continue;
}
// skip time duplicated or totally deleted rows
if (Objects.nonNull(timeInvalidInfo)) {
if (!outer.isNullValue(getValueIndex(sortedRowIndex), validColumnIndex)) {
lastValidPointIndexForTimeDupCheck.left = getTime(sortedRowIndex);
lastValidPointIndexForTimeDupCheck.right = getValueIndex(sortedRowIndex);
}
if (timeInvalidInfo.isMarked(sortedRowIndex)) {
continue;
}
}
// append null value when query column does not exist in current aligned TVList
if (validColumnIndex < 0 || validColumnIndex >= dataTypes.size()) {
valueBuilder.appendNull();
currentWriteRowIndex++;
continue;
}
// The part of code solves the following problem:
// Time: 1,2,2,3
// Value: 1,2,null,null
// When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1)
// When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value
// When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2==pair.left:2, write(T:2,V:2)
// When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2,
// write(T:3,V:null)
int originRowIndex;
if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
&& (getTime(sortedRowIndex) == lastValidPointIndexForTimeDupCheck.left)) {
originRowIndex = lastValidPointIndexForTimeDupCheck.right;
} else {
originRowIndex = getValueIndex(sortedRowIndex);
}
if (outer.isNullValue(originRowIndex, validColumnIndex)
|| isPointDeleted(
getTime(sortedRowIndex),
Objects.isNull(valueColumnsDeletionList)
? null
: valueColumnsDeletionList.get(columnIndex),
deleteCursor)) {
valueBuilder.appendNull();
currentWriteRowIndex++;
continue;
}
hasAnyNonNullValue[currentWriteRowIndex++] = true;
switch (dataTypes.get(validColumnIndex)) {
case BOOLEAN:
valueBuilder.writeBoolean(getBooleanByValueIndex(originRowIndex, validColumnIndex));
break;
case INT32:
case DATE:
valueBuilder.writeInt(getIntByValueIndex(originRowIndex, validColumnIndex));
break;
case INT64:
case TIMESTAMP:
valueBuilder.writeLong(getLongByValueIndex(originRowIndex, validColumnIndex));
break;
case FLOAT:
float valueF = getFloatByValueIndex(originRowIndex, validColumnIndex);
if (encodingList != null) {
valueF =
roundValueWithGivenPrecision(
valueF, floatPrecision, encodingList.get(columnIndex));
}
valueBuilder.writeFloat(valueF);
break;
case DOUBLE:
double valueD = getDoubleByValueIndex(originRowIndex, validColumnIndex);
if (encodingList != null) {
valueD =
roundValueWithGivenPrecision(
valueD, floatPrecision, encodingList.get(columnIndex));
}
valueBuilder.writeDouble(valueD);
break;
case TEXT:
case BLOB:
case STRING:
valueBuilder.writeBinary(getBinaryByValueIndex(originRowIndex, validColumnIndex));
break;
default:
break;
}
}
}
builder.declarePositions(validRowCount);
TsBlock tsBlock = builder.build();
if (ignoreAllNullRows && needRebuildTsBlock(hasAnyNonNullValue)) {
// if exist all null rows, at most have validRowCount - 1 valid rows
tsBlock = reBuildTsBlock(hasAnyNonNullValue, validRowCount, dataTypeList, tsBlock);
}
tsBlocks.add(tsBlock);
probeNext = false;
return tsBlock;
}