cpp/velox/compute/VeloxBackend.h (67 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <boost/algorithm/string.hpp> #include <boost/lexical_cast.hpp> #include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_io.hpp> #include <folly/executors/IOThreadPoolExecutor.h> #include <filesystem> #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/config/Config.h" #include "velox/common/memory/MmapAllocator.h" #include "memory/VeloxMemoryManager.h" namespace gluten { // This kind string must be same with VeloxBackend#name in java side. inline static const std::string kVeloxBackendKind{"velox"}; /// As a static instance in per executor, initialized at executor startup. /// Should not put heavily work here. class VeloxBackend { public: ~VeloxBackend() {} static void create( std::unique_ptr<AllocationListener> listener, const std::unordered_map<std::string, std::string>& conf); static VeloxBackend* get(); facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const; std::shared_ptr<facebook::velox::config::ConfigBase> getBackendConf() const { return backendConf_; } VeloxMemoryManager* getGlobalMemoryManager() const { return globalMemoryManager_.get(); } void tearDown() { // Destruct IOThreadPoolExecutor will join all threads. // On threads exit, thread local variables can be constructed with referencing global variables. // So, we need to destruct IOThreadPoolExecutor and stop the threads before global variables get destructed. ioExecutor_.reset(); globalMemoryManager_.reset(); // dump cache stats on exit if enabled if (dynamic_cast<facebook::velox::cache::AsyncDataCache*>(asyncDataCache_.get())) { LOG(INFO) << asyncDataCache_->toString(); for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) { if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) { LOG(INFO) << "Removing cache file " << entry.path().filename().string(); std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string()); } } asyncDataCache_->shutdown(); } } private: explicit VeloxBackend( std::unique_ptr<AllocationListener> listener, const std::unordered_map<std::string, std::string>& conf) { init(std::move(listener), conf); } void init(std::unique_ptr<AllocationListener> listener, const std::unordered_map<std::string, std::string>& conf); void initCache(); void initConnector(); void initUdf(); std::unique_ptr<facebook::velox::cache::SsdCache> initSsdCache(uint64_t ssdSize); void initJolFilesystem(); std::string getCacheFilePrefix() { return "cache." + boost::lexical_cast<std::string>(boost::uuids::random_generator()()) + "."; } static std::unique_ptr<VeloxBackend> instance_; // A global Velox memory manager for the current process. std::unique_ptr<VeloxMemoryManager> globalMemoryManager_; // Instance of AsyncDataCache used for all large allocations. std::shared_ptr<facebook::velox::cache::AsyncDataCache> asyncDataCache_; std::unique_ptr<folly::IOThreadPoolExecutor> ssdCacheExecutor_; std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_; std::shared_ptr<facebook::velox::memory::MmapAllocator> cacheAllocator_; std::string cachePathPrefix_; std::string cacheFilePrefix_; std::shared_ptr<facebook::velox::config::ConfigBase> backendConf_; }; } // namespace gluten