in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java [128:302]
public static boolean validateTsFileDataCorrectness(TsFileResource resource) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(resource.getTsFilePath())) {
if (!reader.isComplete()) {
logger.error("{} {} illegal tsfile", resource.getTsFilePath(), VALIDATE_FAILED);
return false;
}
Map<Long, IChunkMetadata> chunkMetadataMap = getChunkMetadata(reader);
if (chunkMetadataMap.isEmpty()) {
logger.error(
"{} {} there is no data in the file", resource.getTsFilePath(), VALIDATE_FAILED);
return false;
}
List<List<long[]>> alignedTimeBatches = new ArrayList<>();
Map<String, Integer> valueColumn2TimeBatchIndex = new HashMap<>();
reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
int pageIndex = 0;
byte marker;
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
case MetaMarker.TIME_CHUNK_HEADER:
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
long chunkOffset = reader.position();
ChunkHeader header = reader.readChunkHeader(marker);
IChunkMetadata chunkMetadata = chunkMetadataMap.get(chunkOffset - Byte.BYTES);
if (!chunkMetadata.getMeasurementUid().equals(header.getMeasurementID())) {
logger.error(
"{} chunk start offset is inconsistent with the value in the metadata.",
VALIDATE_FAILED);
return false;
}
String measurement = header.getMeasurementID();
List<long[]> alignedTimeBatch = null;
if (header.getDataType() == TSDataType.VECTOR) {
alignedTimeBatch = new ArrayList<>();
alignedTimeBatches.add(alignedTimeBatch);
} else if (marker == MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER
|| marker == MetaMarker.VALUE_CHUNK_HEADER) {
int timeBatchIndex = valueColumn2TimeBatchIndex.getOrDefault(measurement, 0);
valueColumn2TimeBatchIndex.put(measurement, timeBatchIndex + 1);
alignedTimeBatch = alignedTimeBatches.get(timeBatchIndex);
}
// empty value chunk
int dataSize = header.getDataSize();
if (dataSize == 0) {
break;
}
boolean isHasStatistic = (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER;
Decoder defaultTimeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
Decoder valueDecoder =
Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
pageIndex = 0;
LinkedList<Long> lastNoAlignedPageTimeStamps = new LinkedList<>();
while (dataSize > 0) {
valueDecoder.reset();
PageHeader pageHeader = reader.readPageHeader(header.getDataType(), isHasStatistic);
ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
TimePageReader timePageReader =
new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
long[] pageTimestamps = timePageReader.getNextTimeBatch();
long pageHeaderStartTime =
isHasStatistic ? pageHeader.getStartTime() : chunkMetadata.getStartTime();
long pageHeaderEndTime =
isHasStatistic ? pageHeader.getEndTime() : chunkMetadata.getEndTime();
if (!validateTimeFrame(
alignedTimeBatch,
pageTimestamps,
pageHeaderStartTime,
pageHeaderEndTime,
resource)) {
return false;
}
alignedTimeBatch.add(pageTimestamps);
} else if ((header.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
== TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
ValuePageReader valuePageReader =
new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
valuePageReader.nextValueBatch(alignedTimeBatch.get(pageIndex));
} else { // NonAligned Chunk
PageReader pageReader =
new PageReader(
pageData, header.getDataType(), valueDecoder, defaultTimeDecoder);
BatchData batchData = pageReader.getAllSatisfiedPageData();
long pageHeaderStartTime =
isHasStatistic ? pageHeader.getStartTime() : chunkMetadata.getStartTime();
long pageHeaderEndTime =
isHasStatistic ? pageHeader.getEndTime() : chunkMetadata.getEndTime();
long pageStartTime = Long.MAX_VALUE;
long previousTime = Long.MIN_VALUE;
while (batchData.hasCurrent()) {
long currentTime = batchData.currentTime();
if (!lastNoAlignedPageTimeStamps.isEmpty()
&& currentTime <= lastNoAlignedPageTimeStamps.getLast()) {
logger.error(
"{} {} time ranges overlap between pages.",
resource.getTsFilePath(),
VALIDATE_FAILED);
return false;
}
if (currentTime <= previousTime) {
logger.error(
"{} {} the timestamp in the page is repeated or not incremental.",
resource.getTsFilePath(),
VALIDATE_FAILED);
return false;
}
pageStartTime = Math.min(pageStartTime, currentTime);
previousTime = currentTime;
lastNoAlignedPageTimeStamps.add(currentTime);
batchData.next();
}
if (pageHeaderStartTime != pageStartTime) {
logger.error(
"{} {} the start time in page is different from that in page header.",
resource.getTsFilePath(),
VALIDATE_FAILED);
return false;
}
if (pageHeaderEndTime != previousTime) {
logger.error(
"{} {} the end time in page is different from that in page header.",
resource.getTsFilePath(),
VALIDATE_FAILED);
return false;
}
}
pageIndex++;
dataSize -= pageHeader.getSerializedPageSize();
}
break;
case MetaMarker.CHUNK_GROUP_HEADER:
valueColumn2TimeBatchIndex.clear();
alignedTimeBatches.clear();
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
if (chunkGroupHeader.getDeviceID() == null
|| chunkGroupHeader.getDeviceID().isEmpty()) {
logger.error(
"{} {} device id is null or empty.", resource.getTsFilePath(), VALIDATE_FAILED);
return false;
}
break;
case MetaMarker.OPERATION_INDEX_RANGE:
reader.readPlanIndex();
break;
default:
MetaMarker.handleUnexpectedMarker(marker);
}
}
} catch (IOException
| NegativeArraySizeException
| IllegalArgumentException
| ArrayIndexOutOfBoundsException e) {
logger.error("Meets error when validating TsFile {}, ", resource.getTsFilePath(), e);
return false;
}
return true;
}