in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java [502:632]
private void handleEncoding(
BlockingQueue<Object> ioTaskQueue,
List<List<Integer>> chunkRange,
boolean[] timeDuplicateInfo,
BitMap allValueColDeletedMap,
int maxNumberOfPointsInPage) {
List<TSDataType> dataTypes = list.getTsDataTypes();
Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()];
for (List<Integer> pageRange : chunkRange) {
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
// Pair of Time and Index
if (Objects.nonNull(timeDuplicateInfo)
&& lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
lastValidPointIndexForTimeDupCheck[columnIndex] = new Pair<>(Long.MIN_VALUE, null);
}
TSDataType tsDataType = dataTypes.get(columnIndex);
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) {
continue;
}
// skip time duplicated rows
long time = list.getTime(sortedRowIndex);
if (Objects.nonNull(timeDuplicateInfo)) {
if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) {
lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
lastValidPointIndexForTimeDupCheck[columnIndex].right =
list.getValueIndex(sortedRowIndex);
}
if (timeDuplicateInfo[sortedRowIndex]) {
continue;
}
}
// The part of code solves the following problem:
// Time: 1,2,2,3
// Value: 1,2,null,null
// When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1)
// When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value
// When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2==pair.left:2, write(T:2,V:2)
// When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2,
// write(T:3,V:null)
int originRowIndex;
if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
&& (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right;
} else {
originRowIndex = list.getValueIndex(sortedRowIndex);
}
boolean isNull = list.isNullValue(originRowIndex, columnIndex);
switch (tsDataType) {
case BOOLEAN:
alignedChunkWriter.writeByColumn(
time,
!isNull && list.getBooleanByValueIndex(originRowIndex, columnIndex),
isNull);
break;
case INT32:
case DATE:
alignedChunkWriter.writeByColumn(
time,
isNull ? 0 : list.getIntByValueIndex(originRowIndex, columnIndex),
isNull);
break;
case INT64:
case TIMESTAMP:
alignedChunkWriter.writeByColumn(
time,
isNull ? 0 : list.getLongByValueIndex(originRowIndex, columnIndex),
isNull);
break;
case FLOAT:
alignedChunkWriter.writeByColumn(
time,
isNull ? 0 : list.getFloatByValueIndex(originRowIndex, columnIndex),
isNull);
break;
case DOUBLE:
alignedChunkWriter.writeByColumn(
time,
isNull ? 0 : list.getDoubleByValueIndex(originRowIndex, columnIndex),
isNull);
break;
case TEXT:
case STRING:
case BLOB:
alignedChunkWriter.writeByColumn(
time,
isNull ? null : list.getBinaryByValueIndex(originRowIndex, columnIndex),
isNull);
break;
default:
break;
}
}
alignedChunkWriter.nextColumn();
}
long[] times = new long[Math.min(maxNumberOfPointsInPage, list.rowCount())];
int pointsInPage = 0;
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex)))
|| (list.isTimeDeleted(sortedRowIndex)))) {
continue;
}
if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) {
times[pointsInPage++] = list.getTime(sortedRowIndex);
}
}
alignedChunkWriter.write(times, pointsInPage, 0);
}
alignedChunkWriter.sealCurrentPage();
alignedChunkWriter.clearPageWriter();
try {
ioTaskQueue.put(alignedChunkWriter);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}