in tsfile-viewer-core/src/main/java/org/apache/iotdb/tool/core/service/TsFileAnalyserV13.java [127:292]
private void initTsFileAnalysed() throws IOException {
ChunkMetadata currentChunk;
String measurementID;
TSDataType dataType;
long fileOffsetOfChunk;
// ChunkMetadata of current ChunkGroup
List<IChunkMetadata> chunkMetadataList = new ArrayList<>();
List<List<ChunkHeader>> chunkHeaderLists = new ArrayList<>();
long headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
reader.position(headerLength);
byte marker;
List<long[]> timeBatch = new ArrayList<>();
String lastDeviceId = null;
long lastChunkGroupPosition = 0;
// List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
IChunkMetadata alignedTimeChunk = null;
List<IChunkMetadata> alignedValueChunk = new ArrayList<>();
List<ChunkHeader> chunkHeaderList = new ArrayList<>();
// 0 NonAligned, 1 TimeColumn, 2 ValueColumn
int alignedFlag = 0;
try {
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
case MetaMarker.TIME_CHUNK_HEADER:
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
ChunkHeader chunkHeader = reader.readChunkHeader(marker);
chunkHeaderList.add(chunkHeader);
// 跳过此 chunk,已经读取了 chunkHeader,直接加 dataSize 即可
reader.position(reader.position() + chunkHeader.getDataSize());
// 更新进度条
setRateOfProcess();
break;
case MetaMarker.CHUNK_GROUP_HEADER:
// if there is something wrong with the ChunkGroup Header, we will drop this ChunkGroup
// because we can not guarantee the correctness of the deviceId.
logger.info("Starting read a new ChunkGroupHeader, lastDeviceId:{}", lastDeviceId);
// 更新进度条
setRateOfProcess();
// lastChunkGroupPosition = reader.position();
if (lastDeviceId != null) {
// schema of last chunk group
// for (IMeasurementSchema tsSchema : measurementSchemaList) {
// newSchema.putIfAbsent(
// new Path(lastDeviceId, tsSchema.getMeasurementId()), tsSchema);
// }
// measurementSchemaList = new ArrayList<>();
// 存储上一个 ChunkGroup 已读取的信息
// if (alignedTimeChunk != null && alignedValueChunk.size() > 0) {
// chunkMetadataList.add(
// new AlignedChunkMetadata(alignedTimeChunk, new
// ArrayList<>(alignedValueChunk)));
// chunkHeaderLists.add(new ArrayList<>(chunkHeaderList));
// alignedValueChunk.clear();
// chunkHeaderList.clear();
// }
// chunkGroupMetadataModelList.add(
// new ChunkGroupMetadataModel(
// lastDeviceId,
// new ArrayList<>(chunkMetadataList),
// new ArrayList<>(chunkHeaderLists)));
// chunkMetadataList.clear();
// chunkHeaderLists.clear();
chunkGroupInfoList.add(new ChunkGroupInfo(lastDeviceId, lastChunkGroupPosition));
}
// this is a chunk group
// lastChunkGroupPosition = reader.position() - 1;
lastChunkGroupPosition = reader.position();
// chunkMetadataList = new ArrayList<>();
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
lastDeviceId = chunkGroupHeader.getDeviceID();
break;
// plan index: 标记 chunkgroup 的结束(可用于类似断点续传、checkPoint、snapshot等)
case MetaMarker.OPERATION_INDEX_RANGE:
logger.info("Starting read OperationIndexRange, lastDeviceId:{}", lastDeviceId);
setRateOfProcess();
if (lastDeviceId != null) {
// schema of last chunk group
// for (IMeasurementSchema tsSchema : measurementSchemaList) {
// newSchema.putIfAbsent(
// new Path(lastDeviceId, tsSchema.getMeasurementId()), tsSchema);
// }
// measurementSchemaList = new ArrayList<>();
// if (alignedTimeChunk != null && alignedValueChunk.size() > 0) {
// chunkMetadataList.add(
// new AlignedChunkMetadata(alignedTimeChunk, new
// ArrayList<>(alignedValueChunk)));
// chunkHeaderLists.add(new ArrayList<>(chunkHeaderList));
// alignedValueChunk.clear();
// chunkHeaderList.clear();
// }
// // last chunk group Metadata
// chunkGroupMetadataModelList.add(
// new ChunkGroupMetadataModel(
// lastDeviceId,
// new ArrayList<>(chunkMetadataList), // 不需要
// new ArrayList<>(chunkHeaderLists))); // 需要
// chunkMetadataList.clear();
// chunkHeaderLists.clear();
chunkGroupInfoList.add(new ChunkGroupInfo(lastDeviceId, lastChunkGroupPosition));
lastDeviceId = null;
lastChunkGroupPosition = -1;
}
reader.readPlanIndex();
break;
default:
// the disk file is corrupted, using this file may be dangerous
logger.error("Unexpected marker:{}", marker);
throw new IOException("Unexpected marker " + marker);
}
}
// now we read the tail of the data section, so we are sure that the last
// ChunkGroupFooter is complete.
if (lastDeviceId != null && lastChunkGroupPosition != -1) {
logger.info("Read the tail of the data section, the lastDeviceId:{}", lastDeviceId);
setRateOfProcess();
chunkGroupInfoList.add(new ChunkGroupInfo(lastDeviceId, lastChunkGroupPosition));
// schema of last chunk group
// for (IMeasurementSchema tsSchema : measurementSchemaList) {
// newSchema.putIfAbsent(new Path(lastDeviceId, tsSchema.getMeasurementId()),
// tsSchema);
// }
// if (alignedTimeChunk != null && alignedValueChunk.size() > 0) {
// chunkMetadataList.add(
// new AlignedChunkMetadata(alignedTimeChunk, new
// ArrayList<>(alignedValueChunk)));
// chunkHeaderLists.add(new ArrayList<>(chunkHeaderList));
// alignedValueChunk.clear();
// chunkHeaderList.clear();
// }
// last chunk group Metadata
// chunkGroupMetadataModelList.add(
// new ChunkGroupMetadataModel(
// lastDeviceId,
// new ArrayList<>(chunkMetadataList),
// new ArrayList<>(chunkHeaderLists)));
// chunkMetadataList.clear();
// chunkHeaderLists.clear();
}
} catch (Exception e) {
logger.warn(
"TsFile {} self-check cannot proceed at position {}, recovered, because : {}",
filePath,
reader.position(),
e.getMessage());
}
}