int TsFileIOWriter::write_file_index()

in cpp/src/file/tsfile_io_writer.cc [316:487]


int TsFileIOWriter::write_file_index() {
#if DEBUG_SE
    debug_print_chunk_group_meta_list(chunk_group_meta_list_);
#endif

    int ret = E_OK;
    int64_t meta_offset = 0;
    BloomFilter filter;

    // TODO: better memory manage for this while-loop, cur_index_node_queue
    FileIndexWritingMemManager writing_mm;
    std::shared_ptr<IDeviceID> device_id;
    String measurement_name;
    TimeseriesIndex ts_index;
    std::shared_ptr<IDeviceID> prev_device_id;
    int entry_count_in_cur_device = 0;
    std::shared_ptr<IMetaIndexEntry> meta_index_entry = nullptr;
    std::shared_ptr<MetaIndexNode> cur_index_node = nullptr;
    SimpleList<std::shared_ptr<MetaIndexNode>> *cur_index_node_queue = nullptr;
    DeviceNodeMap device_map;

    TSMIterator tsm_iter(chunk_group_meta_list_);

    if (RET_FAIL(write_separator_marker(meta_offset))) {
        return ret;
    } else if (RET_FAIL(init_bloom_filter(filter))) {
        return ret;
    }

    if (RET_FAIL(tsm_iter.init())) {
        return ret;
    }

    while (IS_SUCC(ret) && tsm_iter.has_next()) {
        ts_index.reset();  // TODO reuse
        if (RET_FAIL(
                tsm_iter.get_next(device_id, measurement_name, ts_index))) {
            break;
        }
#if DEBUG_SE
        std::cout << "tsm_iter get next = {device_name=" << device_id
                  << ", measurement_name=" << measurement_name
                  << ", ts_index=" << ts_index << std::endl;
#endif

        // prepare if it is an entry of a new device
        if (prev_device_id == nullptr || prev_device_id != device_id) {
            if (prev_device_id != nullptr) {
                if (RET_FAIL(add_cur_index_node_to_queue(
                        cur_index_node, cur_index_node_queue))) {
                } else if (RET_FAIL(add_device_node(device_map, prev_device_id,
                                                    cur_index_node_queue,
                                                    writing_mm))) {
                }
            }
            if (IS_SUCC(ret)) {
                destroy_node_list(cur_index_node_queue);
                if (RET_FAIL(alloc_meta_index_node_queue(
                        writing_mm, cur_index_node_queue))) {
                } else if (RET_FAIL(alloc_and_init_meta_index_node(
                               writing_mm, cur_index_node, LEAF_MEASUREMENT))) {
                }
            }
            prev_device_id = device_id;
            entry_count_in_cur_device = 0;
        }

        // pick an entry as index entry every max_degree_of_index_node entries.
        if (IS_SUCC(ret) && entry_count_in_cur_device %
                                    g_config_value_.max_degree_of_index_node_ ==
                                0) {
            if (cur_index_node->is_full()) {
                if (RET_FAIL(add_cur_index_node_to_queue(
                        cur_index_node, cur_index_node_queue))) {
                } else if (RET_FAIL(alloc_and_init_meta_index_node(
                               writing_mm, cur_index_node, LEAF_MEASUREMENT))) {
                }
            }

            if (IS_SUCC(ret)) {
                if (RET_FAIL(alloc_and_init_meta_index_entry(
                        writing_mm, meta_index_entry, measurement_name))) {
                } else if (RET_FAIL(
                               cur_index_node->push_entry(meta_index_entry))) {
                }
            }
        }

        if (IS_SUCC(ret)) {
            OFFSET_DEBUG("before ts_index written");
            common::String tmp_device_name;
            tmp_device_name.dup_from(device_id->get_device_name(),
                                     meta_allocator_);
            // Time column also need add to bloom filter.
            ret = filter.add_path_entry(tmp_device_name, measurement_name);


            if (RET_FAIL(ts_index.serialize_to(write_stream_))) {
            } else {
#if DEBUG_SE
                std::cout << "ts_index.serialize. ts_index=" << ts_index
                          << " file_pos=" << cur_file_position() << std::endl;
#endif
                entry_count_in_cur_device++;
                // add_ts_time_index_entry(ts_index);
            }
        }
    }  // end while
    if (ret == E_NO_MORE_DATA) {
        ret = E_OK;
    }
    ASSERT(ret == E_OK);
    if (IS_SUCC(ret) && cur_index_node != nullptr &&
        cur_index_node_queue != nullptr) {  // iter finish
        ASSERT(cur_index_node != nullptr);
        ASSERT(cur_index_node_queue != nullptr);
        if (RET_FAIL(add_cur_index_node_to_queue(cur_index_node,
                                                 cur_index_node_queue))) {
        } else if (RET_FAIL(add_device_node(device_map, prev_device_id,
                                            cur_index_node_queue,
                                            writing_mm))) {
        }
    }

    if (IS_SUCC(ret)) {
        TsFileMeta tsfile_meta;
        tsfile_meta.meta_offset_ = meta_offset;
        tsfile_meta.bloom_filter_ = &filter;
        // split device by table
        std::map<std::string, DeviceNodeMap> table_device_nodes_map;
        for (const auto &entry : device_map) {
            std::string table_name = entry.first->get_table_name();
            auto &table_map = table_device_nodes_map[table_name];
            if (table_map.empty() ||
                table_map.find(entry.first) == table_map.end()) {
                table_map[entry.first] = entry.second;
            }
        }
        std::map<std::string, std::shared_ptr<MetaIndexNode>> table_nodes_map;
        for (auto &entry : table_device_nodes_map) {
            auto meta_index_node =
                std::make_shared<MetaIndexNode>(&meta_allocator_);
            build_device_level(entry.second, meta_index_node, writing_mm);
            table_nodes_map[entry.first] = meta_index_node;
        }
        tsfile_meta.table_metadata_index_node_map_ = table_nodes_map;
        tsfile_meta.table_schemas_ = schema_->table_schema_map_;
        tsfile_meta.tsfile_properties_.insert(
            std::make_pair("encryptLevel", encrypt_level_));
        tsfile_meta.tsfile_properties_.insert(
            std::make_pair("encryptType", encrypt_type_));
        tsfile_meta.tsfile_properties_.insert(
            std::make_pair("encryptKey", encrypt_key_));
#if DEBUG_SE
        auto tsfile_meta_offset = write_stream_.total_size();
#endif
        auto total_write_size = tsfile_meta.serialize_to(write_stream_);
        if (RET_FAIL(common::SerializationUtil::write_i32(total_write_size,
                                                          write_stream_))) {
            return ret;
        }
        tsfile_meta.bloom_filter_ = nullptr;
#if DEBUG_SE
        std::cout << "writer tsfile_meta: " << tsfile_meta
                  << ", tsfile_meta_offset=" << tsfile_meta_offset
                  << ", size=" << total_write_size << std::endl;
        DEBUG_print_byte_stream("byte_stream", write_stream_);
#endif
    }
    destroy_node_list(cur_index_node_queue);
    return ret;
}