cpp/velox/compute/VeloxBackend.h (50 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <boost/algorithm/string.hpp> #include <boost/lexical_cast.hpp> #include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_io.hpp> #include <folly/executors/IOThreadPoolExecutor.h> #include <filesystem> #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/memory/MemoryPool.h" #include "velox/common/memory/MmapAllocator.h" #include "velox/core/Config.h" namespace gluten { /// As a static instance in per executor, initialized at executor startup. /// Should not put heavily work here. class VeloxBackend { public: ~VeloxBackend() { if (dynamic_cast<facebook::velox::cache::AsyncDataCache*>(asyncDataCache_.get())) { LOG(INFO) << asyncDataCache_->toString(); for (const auto& entry : std::filesystem::directory_iterator(cachePathPrefix_)) { if (entry.path().filename().string().find(cacheFilePrefix_) != std::string::npos) { LOG(INFO) << "Removing cache file " << entry.path().filename().string(); std::filesystem::remove(cachePathPrefix_ + "/" + entry.path().filename().string()); } } asyncDataCache_->shutdown(); } } static void create(const std::unordered_map<std::string, std::string>& conf); static VeloxBackend* get(); facebook::velox::cache::AsyncDataCache* getAsyncDataCache() const; private: explicit VeloxBackend(const std::unordered_map<std::string, std::string>& conf) { init(conf); } void init(const std::unordered_map<std::string, std::string>& conf); void initCache(const std::shared_ptr<const facebook::velox::Config>& conf); void initConnector(const std::shared_ptr<const facebook::velox::Config>& conf); void initUdf(const std::shared_ptr<const facebook::velox::Config>& conf); void initJolFilesystem(const std::shared_ptr<const facebook::velox::Config>& conf); std::string getCacheFilePrefix() { return "cache." + boost::lexical_cast<std::string>(boost::uuids::random_generator()()) + "."; } static std::unique_ptr<VeloxBackend> instance_; // Instance of AsyncDataCache used for all large allocations. std::shared_ptr<facebook::velox::cache::AsyncDataCache> asyncDataCache_; std::unique_ptr<folly::IOThreadPoolExecutor> ssdCacheExecutor_; std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_; std::shared_ptr<facebook::velox::memory::MmapAllocator> cacheAllocator_; std::string cachePathPrefix_; std::string cacheFilePrefix_; }; } // namespace gluten