in tsfile-viewer-core/src/main/java/org/apache/iotdb/tool/core/service/TsFileAnalyserV13.java [382:520]
public void fetchChunkInfo(ChunkGroupMetaInfo chunkGroupMetaInfo) throws IOException {
long chunkOffset = reader.position() - 1;
// ChunkHeader 中的 marker:判断 chunk 的开始,对齐或非对齐
ChunkHeader chunkHeader = reader.readChunkHeader(chunkGroupMetaInfo.getMarker());
// 0 NonAligned, 1 TimeColumn, 2 ValueColumn
int alignedFlag = 0;
String measurementID = chunkHeader.getMeasurementID();
IMeasurementSchema measurementSchema =
new MeasurementSchema(
measurementID,
chunkHeader.getDataType(),
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType());
measurementSchemaList.add(measurementSchema);
// TODO 不需要吧
// 判断对其 or 非对齐
TSDataType dataType = chunkHeader.getDataType();
if (chunkHeader.getDataType() == TSDataType.VECTOR) {
chunkGroupMetaInfo.getTimeBatch().clear();
}
// 跳过此 chunk 下所有的 page
Statistics<? extends Serializable> chunkStatistics = Statistics.getStatsByType(dataType);
int chunkDataSize = chunkHeader.getDataSize();
if (chunkDataSize > 0) {
// 此 chunk 中的 page 数量超过一页,需要通过每个 pageHeader 中的 statistic 构建并更新 chunk 的 statistic
if (((byte) (chunkHeader.getChunkType() & CHUNK_HEADER_MASK)) == MetaMarker.CHUNK_HEADER) {
// 1. 若是对齐时间序列
// 1.1 TimeChunk(Pages)
if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) {
alignedFlag = 1;
} else if ((chunkHeader.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
== TsFileConstant.VALUE_COLUMN_MASK) {
// 1.2 ValueChunk(Pages)
alignedFlag = 2;
}
// 2. 读取 Chunk 中所有的 page
while (chunkDataSize > 0) {
// a new Page
PageHeader pageHeader = reader.readPageHeader(chunkHeader.getDataType(), true);
if (pageHeader.getUncompressedSize() != 0) {
// not empty page
chunkStatistics.mergeStatistics(pageHeader.getStatistics());
}
// 跳过 Page 中具体的数据
reader.skipPageData(pageHeader);
// 减去 pageHeader 的长度
chunkDataSize -= pageHeader.getSerializedPageSize();
chunkHeader.increasePageNums(1);
}
} else {
// only one page without statistic, we need to iterate each point to generate chunk
// statistic
PageHeader pageHeader = reader.readPageHeader(chunkHeader.getDataType(), false);
System.out.println(
"only one page without statistic, reader position1:" + reader.position());
Decoder valueDecoder =
Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
ByteBuffer pageData = reader.readPage(pageHeader, chunkHeader.getCompressionType());
System.out.println(
"only one page without statistic, reader position2:" + reader.position());
Decoder timeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) {
// Time Chunk with only one page
TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, timeDecoder);
long[] currentTimeBatch = timePageReader.getNextTimeBatch();
chunkGroupMetaInfo.getTimeBatch().add(currentTimeBatch);
// 遍历每个时间点,并更新 chunk 的 statistic
for (long currentTime : currentTimeBatch) {
chunkStatistics.update(currentTime);
}
alignedFlag = 1;
} else if ((chunkHeader.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
== TsFileConstant.VALUE_COLUMN_MASK) {
// Value Chunk with only one page
ValuePageReader valuePageReader =
new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(), valueDecoder);
TsPrimitiveType[] valueBatch =
valuePageReader.nextValueBatch(chunkGroupMetaInfo.getTimeBatch().get(0));
// 遍历每个数据点,并更新 chunk 的 statistic
if (valueBatch != null && valueBatch.length != 0) {
for (int i = 0; i < valueBatch.length; i++) {
TsPrimitiveType value = valueBatch[i];
if (value == null) {
continue;
}
long timeStamp = chunkGroupMetaInfo.getTimeBatch().get(0)[i];
setChunkStatistics(chunkStatistics, timeStamp, value, dataType);
}
}
alignedFlag = 2;
} else {
// NonAligned Chunk with only one page
PageReader pageReader =
new PageReader(
pageHeader, pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder, null);
BatchData batchData = pageReader.getAllSatisfiedPageData();
while (batchData.hasCurrent()) {
setChunkStatistics(
chunkStatistics,
batchData.currentTime(),
batchData.currentTsPrimitiveType(),
dataType);
batchData.next();
}
alignedFlag = 0;
}
chunkHeader.increasePageNums(1);
}
}
// 构建 chunkMetadata
ChunkMetadata chunkMetadata =
new ChunkMetadata(measurementID, dataType, chunkOffset, chunkStatistics);
if (alignedFlag == 1) {
chunkGroupMetaInfo.setAlignedTimeChunkMetadata(chunkMetadata);
chunkGroupMetaInfo.getChunkHeaderList().add(chunkHeader);
} else if (alignedFlag == 2) {
chunkGroupMetaInfo.getAlignedValueChunkMetadata().add(chunkMetadata);
chunkGroupMetaInfo.getChunkHeaderList().add(chunkHeader);
} else if (alignedFlag == 3) {
chunkGroupMetaInfo.getChunkHeaderList().add(chunkHeader);
} else {
chunkGroupMetaInfo.getChunkMetadataList().add(chunkMetadata);
chunkGroupMetaInfo.getChunkHeaderList().add(chunkHeader);
}
}