be/src/vec/functions/dictionary_factory.h (112 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gen_cpp/BackendService_types.h>
#include <mutex>
#include "common/config.h"
#include "common/logging.h"
#include "vec/functions/dictionary.h"
namespace doris {
class MemTrackerLimiter;
}
namespace doris::vectorized {
class DictionaryFactory : private boost::noncopyable {
public:
DictionaryFactory();
~DictionaryFactory();
// Returns nullptr if failed
std::shared_ptr<const IDictionary> get(int64_t dict_id, int64_t version_id) {
std::unique_lock lc(_mutex);
// dict_id and version_id must match
if (_dict_id_to_dict_map.contains(dict_id) &&
_dict_id_to_version_id_map[dict_id] == version_id) {
return _dict_id_to_dict_map[dict_id];
}
return nullptr;
}
Status refresh_dict(int64_t dict_id, int64_t version_id, DictionaryPtr dict) {
VLOG_DEBUG << "DictionaryFactory refresh dictionary"
<< " dict_id: " << dict_id << " version_id: " << version_id
<< " dict name: " << dict->dict_name();
std::unique_lock lc(_mutex);
dict->_mem_tracker = _mem_tracker;
_refreshing_dict_map[dict_id] = std::make_pair(version_id, dict);
// Set the mem tracker for the dictionary
return Status::OK();
}
Status abort_refresh_dict(int64_t dict_id, int64_t version_id) {
VLOG_DEBUG << "DictionaryFactory abort refresh dictionary"
<< " dict_id: " << dict_id << " version_id: " << version_id;
std::unique_lock lc(_mutex);
if (!_refreshing_dict_map.contains(dict_id)) {
// FE will abort all, including succeed and failed.
return Status::OK();
}
auto [refresh_version_id, dict] = _refreshing_dict_map[dict_id];
if (version_id != refresh_version_id) {
return Status::InvalidArgument(
"Version ID is not equal to the refreshing version ID. {} : {}", version_id,
refresh_version_id);
}
_refreshing_dict_map.erase(dict_id);
return Status::OK();
}
Status commit_refresh_dict(int64_t dict_id, int64_t version_id) {
VLOG_DEBUG << "DictionaryFactory commit refresh dictionary"
<< " dict_id: " << dict_id << " version_id: " << version_id;
std::unique_lock lc(_mutex);
if (!_refreshing_dict_map.contains(dict_id)) {
return Status::InvalidArgument("Dictionary is not refreshing dict_id: {}", dict_id);
}
auto [refresh_version_id, dict] = _refreshing_dict_map[dict_id];
if (version_id != refresh_version_id) {
return Status::InvalidArgument(
"Version ID is not equal to the refreshing version ID. {} : {}", version_id,
refresh_version_id);
}
{
// commit the dictionary
if (_dict_id_to_version_id_map.contains(dict_id)) {
// check version_id
if (version_id <= _dict_id_to_version_id_map[dict_id]) {
LOG_WARNING(
"DictionaryFactory Failed to commit dictionary because version ID "
"is not greater than the existing version ID")
.tag("dict_id", dict_id)
.tag("version_id", version_id)
.tag("dict name", dict->dict_name())
.tag("existing version ID", _dict_id_to_version_id_map[dict_id]);
return Status::InvalidArgument(
"Version ID is not greater than the existing version ID for the "
"dictionary. {} : {}",
version_id, _dict_id_to_version_id_map[dict_id]);
}
}
LOG_INFO("DictionaryFactory Successfully commit dictionary")
.tag("dict_id", dict_id)
.tag("version_id", version_id)
.tag("dict name", dict->dict_name());
_dict_id_to_dict_map[dict_id] = dict;
_dict_id_to_version_id_map[dict_id] = version_id;
_refreshing_dict_map.erase(dict_id);
}
return Status::OK();
}
Status delete_dict(int64_t dict_id) {
VLOG_DEBUG << "DictionaryFactory delete dictionary, dict_id: " << dict_id;
std::unique_lock lc(_mutex);
if (!_dict_id_to_dict_map.contains(dict_id)) {
LOG_WARNING("DictionaryFactory Failed to delete dictionary").tag("dict_id", dict_id);
return Status::OK();
}
auto dict = _dict_id_to_dict_map[dict_id];
LOG_INFO("DictionaryFactory Successfully delete dictionary")
.tag("dict_id", dict_id)
.tag("dict name", dict->dict_name());
_dict_id_to_dict_map.erase(dict_id);
_dict_id_to_version_id_map.erase(dict_id);
return Status::OK();
}
std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return _mem_tracker; }
void get_dictionary_status(std::vector<TDictionaryStatus>& result,
std::vector<int64_t> dict_ids);
private:
std::map<int64_t, DictionaryPtr> _dict_id_to_dict_map;
std::map<int64_t, int64_t> _dict_id_to_version_id_map;
std::map<int64_t, std::pair<int64_t, DictionaryPtr>>
_refreshing_dict_map; // dict_id -> (version_id, dict)
std::shared_mutex _mutex;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};
} // namespace doris::vectorized