in cpp/src/writer/tsfile_writer.cc [411:498]
int TsFileWriter::do_check_schema_table(
std::shared_ptr<IDeviceID> device_id, Tablet &tablet,
storage::TimeChunkWriter *&time_chunk_writer,
common::SimpleVector<storage::ValueChunkWriter *> &value_chunk_writers) {
int ret = E_OK;
auto dev_it = schemas_.find(device_id);
MeasurementSchemaGroup *device_schema = NULL;
auto &schema_map = io_writer_->get_schema()->table_schema_map_;
auto table_schema_it = schema_map.find(tablet.get_table_name());
if (UNLIKELY(table_schema_it == schema_map.end())) {
return E_TABLE_NOT_EXIST;
}
auto table_schema = table_schema_it->second;
if (UNLIKELY(dev_it == schemas_.end()) ||
IS_NULL(device_schema = dev_it->second)) {
device_schema = new MeasurementSchemaGroup;
device_schema->is_aligned_ = true;
device_schema->time_chunk_writer_ = new TimeChunkWriter();
device_schema->time_chunk_writer_->init(
"", g_config_value_.time_encoding_type_,
g_config_value_.time_compress_type_);
for (uint32_t i = 0; i < table_schema->get_measurement_schemas().size();
++i) {
if (table_schema->get_column_categories().at(i) ==
common::ColumnCategory::FIELD) {
auto table_column_schema =
table_schema->get_measurement_schemas().at(i);
auto device_column_schema = new MeasurementSchema(
table_column_schema->measurement_name_,
table_column_schema->data_type_,
table_column_schema->encoding_,
table_column_schema->compression_type_);
if (!table_column_schema->props_.empty()) {
device_column_schema->props_ = table_column_schema->props_;
}
device_schema->measurement_schema_map_
[device_column_schema->measurement_name_] =
device_column_schema;
}
}
schemas_[device_id] = device_schema;
}
uint32_t column_cnt = tablet.get_column_count();
time_chunk_writer = device_schema->time_chunk_writer_;
MeasurementSchemaMap &msm = device_schema->measurement_schema_map_;
for (uint32_t i = 0; i < column_cnt; i++) {
if (tablet.column_categories_.at(i) != common::ColumnCategory::FIELD) {
continue;
}
auto ms_iter = msm.find(tablet.get_column_name(i));
if (UNLIKELY(ms_iter == msm.end())) {
value_chunk_writers.push_back(NULL);
} else {
// Here we may check data_type against ms_iter. But in Java
// libtsfile, no check here.
MeasurementSchema *ms = ms_iter->second;
if (IS_NULL(ms->value_chunk_writer_)) {
ms->value_chunk_writer_ = new ValueChunkWriter;
ret = ms->value_chunk_writer_->init(
ms->measurement_name_, ms->data_type_, ms->encoding_,
ms->compression_type_);
if (IS_SUCC(ret)) {
value_chunk_writers.push_back(ms->value_chunk_writer_);
} else {
value_chunk_writers.push_back(NULL);
for (size_t chunk_writer_idx = 0;
chunk_writer_idx < value_chunk_writers.size();
chunk_writer_idx++) {
if (!value_chunk_writers[chunk_writer_idx]) {
delete value_chunk_writers[chunk_writer_idx];
}
}
ret = common::E_INVALID_ARG;
return ret;
}
} else {
value_chunk_writers.push_back(ms->value_chunk_writer_);
}
}
}
return ret;
}