in java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java [2064:2366]
public long selfCheck(
Schema schema, List<ChunkGroupMetadata> chunkGroupMetadataList, boolean fastFinish)
throws IOException {
File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
long fileSize;
if (!checkFile.exists()) {
return TsFileCheckStatus.FILE_NOT_FOUND;
} else {
fileSize = checkFile.length();
}
ChunkMetadata currentChunk;
String measurementID;
TSDataType dataType;
long fileOffsetOfChunk;
// ChunkMetadata of current ChunkGroup
List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
if (fileSize < headerLength) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())) {
return TsFileCheckStatus.INCOMPATIBLE_FILE;
}
fileVersion = readVersionNumber();
checkFileVersion();
tsFileInput.position(headerLength);
boolean isComplete = isComplete();
if (fileSize == headerLength) {
return headerLength;
} else if (isComplete) {
loadMetadataSize();
if (fastFinish) {
return TsFileCheckStatus.COMPLETE_FILE;
}
}
// if not a complete file, we will recover it...
long truncatedSize = headerLength;
byte marker;
List<long[]> timeBatch = new ArrayList<>();
IDeviceID lastDeviceId = null;
List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
Map<String, Integer> valueColumn2TimeBatchIndex = new HashMap<>();
try {
while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
case MetaMarker.TIME_CHUNK_HEADER:
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
fileOffsetOfChunk = this.position() - 1;
// if there is something wrong with a chunk, we will drop the whole ChunkGroup
// as different chunks may be created by the same insertions(sqls), and partial
// insertion is not tolerable
ChunkHeader chunkHeader = this.readChunkHeader(marker);
measurementID = chunkHeader.getMeasurementID();
IMeasurementSchema measurementSchema =
new MeasurementSchema(
measurementID,
chunkHeader.getDataType(),
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType());
measurementSchemaList.add(measurementSchema);
dataType = chunkHeader.getDataType();
Statistics<? extends Serializable> chunkStatistics =
Statistics.getStatsByType(dataType);
int dataSize = chunkHeader.getDataSize();
if (dataSize > 0) {
if (marker == MetaMarker.TIME_CHUNK_HEADER) {
timeBatch.add(null);
}
if (((byte) (chunkHeader.getChunkType() & 0x3F))
== MetaMarker
.CHUNK_HEADER) { // more than one page, we could use page statistics to
if (marker == MetaMarker.VALUE_CHUNK_HEADER) {
int timeBatchIndex =
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
valueColumn2TimeBatchIndex.put(
chunkHeader.getMeasurementID(), timeBatchIndex + 1);
}
// generate chunk statistic
while (dataSize > 0) {
// a new Page
PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true);
if (pageHeader.getUncompressedSize() != 0) {
// not empty page
chunkStatistics.mergeStatistics(pageHeader.getStatistics());
}
this.skipPageData(pageHeader);
dataSize -= pageHeader.getSerializedPageSize();
chunkHeader.increasePageNums(1);
}
} else { // only one page without statistic, we need to iterate each point to generate
// chunk statistic
PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false);
Decoder valueDecoder =
Decoder.getDecoderByType(
chunkHeader.getEncodingType(), chunkHeader.getDataType());
ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType());
Decoder timeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(
TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk with only one page
TimePageReader timePageReader =
new TimePageReader(pageHeader, pageData, timeDecoder);
long[] currentTimeBatch = timePageReader.getNextTimeBatch();
timeBatch.add(currentTimeBatch);
for (long currentTime : currentTimeBatch) {
chunkStatistics.update(currentTime);
}
} else if ((chunkHeader.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
== TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk with only one page
ValuePageReader valuePageReader =
new ValuePageReader(
pageHeader, pageData, chunkHeader.getDataType(), valueDecoder);
int timeBatchIndex =
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
valueColumn2TimeBatchIndex.put(
chunkHeader.getMeasurementID(), timeBatchIndex + 1);
TsPrimitiveType[] valueBatch =
valuePageReader.nextValueBatch(timeBatch.get(timeBatchIndex));
if (valueBatch != null && valueBatch.length != 0) {
for (int i = 0; i < valueBatch.length; i++) {
TsPrimitiveType value = valueBatch[i];
if (value == null) {
continue;
}
long timeStamp = timeBatch.get(timeBatchIndex)[i];
switch (dataType) {
case INT32:
case DATE:
chunkStatistics.update(timeStamp, value.getInt());
break;
case INT64:
case TIMESTAMP:
chunkStatistics.update(timeStamp, value.getLong());
break;
case FLOAT:
chunkStatistics.update(timeStamp, value.getFloat());
break;
case DOUBLE:
chunkStatistics.update(timeStamp, value.getDouble());
break;
case BOOLEAN:
chunkStatistics.update(timeStamp, value.getBoolean());
break;
case TEXT:
case BLOB:
case STRING:
chunkStatistics.update(timeStamp, value.getBinary());
break;
default:
throw new IOException("Unexpected type " + dataType);
}
}
}
} else { // NonAligned Chunk with only one page
PageReader reader =
new PageReader(
pageHeader,
pageData,
chunkHeader.getDataType(),
valueDecoder,
timeDecoder);
BatchData batchData = reader.getAllSatisfiedPageData();
while (batchData.hasCurrent()) {
switch (dataType) {
case INT32:
case DATE:
chunkStatistics.update(batchData.currentTime(), batchData.getInt());
break;
case INT64:
case TIMESTAMP:
chunkStatistics.update(batchData.currentTime(), batchData.getLong());
break;
case FLOAT:
chunkStatistics.update(batchData.currentTime(), batchData.getFloat());
break;
case DOUBLE:
chunkStatistics.update(batchData.currentTime(), batchData.getDouble());
break;
case BOOLEAN:
chunkStatistics.update(batchData.currentTime(), batchData.getBoolean());
break;
case TEXT:
case BLOB:
case STRING:
chunkStatistics.update(batchData.currentTime(), batchData.getBinary());
break;
default:
throw new IOException("Unexpected type " + dataType);
}
batchData.next();
}
}
chunkHeader.increasePageNums(1);
}
} else if (marker == MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER
|| marker == MetaMarker.VALUE_CHUNK_HEADER) {
int timeBatchIndex =
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
valueColumn2TimeBatchIndex.put(chunkHeader.getMeasurementID(), timeBatchIndex + 1);
}
currentChunk =
new ChunkMetadata(
measurementID,
dataType,
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType(),
fileOffsetOfChunk,
chunkStatistics);
chunkMetadataList.add(currentChunk);
break;
case MetaMarker.CHUNK_GROUP_HEADER:
// if there is something wrong with the ChunkGroup Header, we will drop this ChunkGroup
// because we can not guarantee the correctness of the deviceId.
truncatedSize = this.position() - 1;
if (lastDeviceId != null) {
// schema of last chunk group
if (schema != null) {
for (IMeasurementSchema tsSchema : measurementSchemaList) {
schema.registerTimeseries(lastDeviceId, tsSchema);
}
}
measurementSchemaList = new ArrayList<>();
// last chunk group Metadata
chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
}
// this is a chunk group
chunkMetadataList = new ArrayList<>();
ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader();
lastDeviceId = chunkGroupHeader.getDeviceID();
timeBatch.clear();
valueColumn2TimeBatchIndex.clear();
break;
case MetaMarker.OPERATION_INDEX_RANGE:
truncatedSize = this.position() - 1;
if (lastDeviceId != null) {
// schema of last chunk group
if (schema != null) {
for (IMeasurementSchema tsSchema : measurementSchemaList) {
schema.registerTimeseries(lastDeviceId, tsSchema);
}
}
measurementSchemaList = new ArrayList<>();
// last chunk group Metadata
chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
lastDeviceId = null;
}
readPlanIndex();
truncatedSize = this.position();
break;
default:
// the disk file is corrupted, using this file may be dangerous
throw new IOException("Unexpected marker " + marker);
}
}
// now we read the tail of the data section, so we are sure that the last
// ChunkGroupFooter is complete.
if (lastDeviceId != null) {
// schema of last chunk group
if (schema != null) {
for (IMeasurementSchema tsSchema : measurementSchemaList) {
schema.registerTimeseries(lastDeviceId, tsSchema);
}
}
// last chunk group Metadata
chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
}
if (isComplete) {
truncatedSize = TsFileCheckStatus.COMPLETE_FILE;
} else {
truncatedSize = this.position() - 1;
}
} catch (Exception e) {
logger.warn(
"TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
file,
this.position(),
e.getMessage());
}
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
schema.updateTableSchema(chunkGroupMetadata);
}
// Despite the completeness of the data section, we will discard current FileMetadata
// so that we can continue to write data into this tsfile.
return truncatedSize;
}