int TsFileWriter::do_check_schema_table()

in cpp/src/writer/tsfile_writer.cc [411:498]


int TsFileWriter::do_check_schema_table(
    std::shared_ptr<IDeviceID> device_id, Tablet &tablet,
    storage::TimeChunkWriter *&time_chunk_writer,
    common::SimpleVector<storage::ValueChunkWriter *> &value_chunk_writers) {
    int ret = E_OK;

    auto dev_it = schemas_.find(device_id);
    MeasurementSchemaGroup *device_schema = NULL;

    auto &schema_map = io_writer_->get_schema()->table_schema_map_;
    auto table_schema_it = schema_map.find(tablet.get_table_name());
    if (UNLIKELY(table_schema_it == schema_map.end())) {
        return E_TABLE_NOT_EXIST;
    }
    auto table_schema = table_schema_it->second;

    if (UNLIKELY(dev_it == schemas_.end()) ||
        IS_NULL(device_schema = dev_it->second)) {
        device_schema = new MeasurementSchemaGroup;
        device_schema->is_aligned_ = true;
        device_schema->time_chunk_writer_ = new TimeChunkWriter();
        device_schema->time_chunk_writer_->init(
            "", g_config_value_.time_encoding_type_,
            g_config_value_.time_compress_type_);

        for (uint32_t i = 0; i < table_schema->get_measurement_schemas().size();
             ++i) {
            if (table_schema->get_column_categories().at(i) ==
                common::ColumnCategory::FIELD) {
                auto table_column_schema =
                    table_schema->get_measurement_schemas().at(i);
                auto device_column_schema = new MeasurementSchema(
                    table_column_schema->measurement_name_,
                    table_column_schema->data_type_,
                    table_column_schema->encoding_,
                    table_column_schema->compression_type_);
                if (!table_column_schema->props_.empty()) {
                    device_column_schema->props_ = table_column_schema->props_;
                }
                device_schema->measurement_schema_map_
                    [device_column_schema->measurement_name_] =
                    device_column_schema;
            }
        }
        schemas_[device_id] = device_schema;
    }

    uint32_t column_cnt = tablet.get_column_count();
    time_chunk_writer = device_schema->time_chunk_writer_;
    MeasurementSchemaMap &msm = device_schema->measurement_schema_map_;

    for (uint32_t i = 0; i < column_cnt; i++) {
        if (tablet.column_categories_.at(i) != common::ColumnCategory::FIELD) {
            continue;
        }
        auto ms_iter = msm.find(tablet.get_column_name(i));
        if (UNLIKELY(ms_iter == msm.end())) {
            value_chunk_writers.push_back(NULL);
        } else {
            // Here we may check data_type against ms_iter. But in Java
            // libtsfile, no check here.
            MeasurementSchema *ms = ms_iter->second;
            if (IS_NULL(ms->value_chunk_writer_)) {
                ms->value_chunk_writer_ = new ValueChunkWriter;
                ret = ms->value_chunk_writer_->init(
                    ms->measurement_name_, ms->data_type_, ms->encoding_,
                    ms->compression_type_);
                if (IS_SUCC(ret)) {
                    value_chunk_writers.push_back(ms->value_chunk_writer_);
                } else {
                    value_chunk_writers.push_back(NULL);
                    for (size_t chunk_writer_idx = 0;
                         chunk_writer_idx < value_chunk_writers.size();
                         chunk_writer_idx++) {
                        if (!value_chunk_writers[chunk_writer_idx]) {
                            delete value_chunk_writers[chunk_writer_idx];
                        }
                    }
                    ret = common::E_INVALID_ARG;
                    return ret;
                }
            } else {
                value_chunk_writers.push_back(ms->value_chunk_writer_);
            }
        }
    }
    return ret;
}