in cpp/src/file/tsfile_io_writer.cc [316:487]
int TsFileIOWriter::write_file_index() {
#if DEBUG_SE
debug_print_chunk_group_meta_list(chunk_group_meta_list_);
#endif
int ret = E_OK;
int64_t meta_offset = 0;
BloomFilter filter;
// TODO: better memory manage for this while-loop, cur_index_node_queue
FileIndexWritingMemManager writing_mm;
std::shared_ptr<IDeviceID> device_id;
String measurement_name;
TimeseriesIndex ts_index;
std::shared_ptr<IDeviceID> prev_device_id;
int entry_count_in_cur_device = 0;
std::shared_ptr<IMetaIndexEntry> meta_index_entry = nullptr;
std::shared_ptr<MetaIndexNode> cur_index_node = nullptr;
SimpleList<std::shared_ptr<MetaIndexNode>> *cur_index_node_queue = nullptr;
DeviceNodeMap device_map;
TSMIterator tsm_iter(chunk_group_meta_list_);
if (RET_FAIL(write_separator_marker(meta_offset))) {
return ret;
} else if (RET_FAIL(init_bloom_filter(filter))) {
return ret;
}
if (RET_FAIL(tsm_iter.init())) {
return ret;
}
while (IS_SUCC(ret) && tsm_iter.has_next()) {
ts_index.reset(); // TODO reuse
if (RET_FAIL(
tsm_iter.get_next(device_id, measurement_name, ts_index))) {
break;
}
#if DEBUG_SE
std::cout << "tsm_iter get next = {device_name=" << device_id
<< ", measurement_name=" << measurement_name
<< ", ts_index=" << ts_index << std::endl;
#endif
// prepare if it is an entry of a new device
if (prev_device_id == nullptr || prev_device_id != device_id) {
if (prev_device_id != nullptr) {
if (RET_FAIL(add_cur_index_node_to_queue(
cur_index_node, cur_index_node_queue))) {
} else if (RET_FAIL(add_device_node(device_map, prev_device_id,
cur_index_node_queue,
writing_mm))) {
}
}
if (IS_SUCC(ret)) {
destroy_node_list(cur_index_node_queue);
if (RET_FAIL(alloc_meta_index_node_queue(
writing_mm, cur_index_node_queue))) {
} else if (RET_FAIL(alloc_and_init_meta_index_node(
writing_mm, cur_index_node, LEAF_MEASUREMENT))) {
}
}
prev_device_id = device_id;
entry_count_in_cur_device = 0;
}
// pick an entry as index entry every max_degree_of_index_node entries.
if (IS_SUCC(ret) && entry_count_in_cur_device %
g_config_value_.max_degree_of_index_node_ ==
0) {
if (cur_index_node->is_full()) {
if (RET_FAIL(add_cur_index_node_to_queue(
cur_index_node, cur_index_node_queue))) {
} else if (RET_FAIL(alloc_and_init_meta_index_node(
writing_mm, cur_index_node, LEAF_MEASUREMENT))) {
}
}
if (IS_SUCC(ret)) {
if (RET_FAIL(alloc_and_init_meta_index_entry(
writing_mm, meta_index_entry, measurement_name))) {
} else if (RET_FAIL(
cur_index_node->push_entry(meta_index_entry))) {
}
}
}
if (IS_SUCC(ret)) {
OFFSET_DEBUG("before ts_index written");
common::String tmp_device_name;
tmp_device_name.dup_from(device_id->get_device_name(),
meta_allocator_);
// Time column also need add to bloom filter.
ret = filter.add_path_entry(tmp_device_name, measurement_name);
if (RET_FAIL(ts_index.serialize_to(write_stream_))) {
} else {
#if DEBUG_SE
std::cout << "ts_index.serialize. ts_index=" << ts_index
<< " file_pos=" << cur_file_position() << std::endl;
#endif
entry_count_in_cur_device++;
// add_ts_time_index_entry(ts_index);
}
}
} // end while
if (ret == E_NO_MORE_DATA) {
ret = E_OK;
}
ASSERT(ret == E_OK);
if (IS_SUCC(ret) && cur_index_node != nullptr &&
cur_index_node_queue != nullptr) { // iter finish
ASSERT(cur_index_node != nullptr);
ASSERT(cur_index_node_queue != nullptr);
if (RET_FAIL(add_cur_index_node_to_queue(cur_index_node,
cur_index_node_queue))) {
} else if (RET_FAIL(add_device_node(device_map, prev_device_id,
cur_index_node_queue,
writing_mm))) {
}
}
if (IS_SUCC(ret)) {
TsFileMeta tsfile_meta;
tsfile_meta.meta_offset_ = meta_offset;
tsfile_meta.bloom_filter_ = &filter;
// split device by table
std::map<std::string, DeviceNodeMap> table_device_nodes_map;
for (const auto &entry : device_map) {
std::string table_name = entry.first->get_table_name();
auto &table_map = table_device_nodes_map[table_name];
if (table_map.empty() ||
table_map.find(entry.first) == table_map.end()) {
table_map[entry.first] = entry.second;
}
}
std::map<std::string, std::shared_ptr<MetaIndexNode>> table_nodes_map;
for (auto &entry : table_device_nodes_map) {
auto meta_index_node =
std::make_shared<MetaIndexNode>(&meta_allocator_);
build_device_level(entry.second, meta_index_node, writing_mm);
table_nodes_map[entry.first] = meta_index_node;
}
tsfile_meta.table_metadata_index_node_map_ = table_nodes_map;
tsfile_meta.table_schemas_ = schema_->table_schema_map_;
tsfile_meta.tsfile_properties_.insert(
std::make_pair("encryptLevel", encrypt_level_));
tsfile_meta.tsfile_properties_.insert(
std::make_pair("encryptType", encrypt_type_));
tsfile_meta.tsfile_properties_.insert(
std::make_pair("encryptKey", encrypt_key_));
#if DEBUG_SE
auto tsfile_meta_offset = write_stream_.total_size();
#endif
auto total_write_size = tsfile_meta.serialize_to(write_stream_);
if (RET_FAIL(common::SerializationUtil::write_i32(total_write_size,
write_stream_))) {
return ret;
}
tsfile_meta.bloom_filter_ = nullptr;
#if DEBUG_SE
std::cout << "writer tsfile_meta: " << tsfile_meta
<< ", tsfile_meta_offset=" << tsfile_meta_offset
<< ", size=" << total_write_size << std::endl;
DEBUG_print_byte_stream("byte_stream", write_stream_);
#endif
}
destroy_node_list(cur_index_node_queue);
return ret;
}