cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp (368 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ShuffleSplitter.h" #include <filesystem> #include <format> #include <memory> #include <string> #include <fcntl.h> #include <Compression/CompressionFactory.h> #include <Storages/IO/AggregateSerializationUtils.h> #include <IO/WriteBuffer.h> #include <IO/ReadBufferFromFile.h> #include <IO/WriteHelpers.h> #include <Parser/SerializedPlanParser.h> #include <boost/algorithm/string/case_conv.hpp> #include <Poco/StringTokenizer.h> #include <Common/Stopwatch.h> #include <Common/DebugUtils.h> namespace local_engine { void ShuffleSplitter::split(DB::Block & block) { if (block.rows() == 0) { return; } initOutputIfNeeded(block); computeAndCountPartitionId(block); Stopwatch split_time_watch; block = convertAggregateStateInBlock(block); split_result.total_split_time += split_time_watch.elapsedNanoseconds(); splitBlockByPartition(block); } SplitResult ShuffleSplitter::stop() { // spill all buffers Stopwatch watch; for (size_t i = 0; i < options.partition_num; i++) { spillPartition(i); partition_outputs[i]->flush(); partition_write_buffers[i]->sync(); } for (auto * item : compressed_buffers) { if (item) { split_result.total_compress_time += item->getCompressTime(); split_result.total_io_time += item->getWriteTime(); } } split_result.total_serialize_time = split_result.total_spill_time - split_result.total_compress_time - split_result.total_io_time; partition_outputs.clear(); partition_cached_write_buffers.clear(); partition_write_buffers.clear(); mergePartitionFiles(); split_result.total_write_time += watch.elapsedNanoseconds(); stopped = true; return split_result; } void ShuffleSplitter::initOutputIfNeeded(Block & block) { if (output_header.columns() == 0) [[unlikely]] { output_header = block.cloneEmpty(); if (output_columns_indicies.empty()) { output_header = block.cloneEmpty(); for (size_t i = 0; i < block.columns(); ++i) { output_columns_indicies.push_back(i); } } else { ColumnsWithTypeAndName cols; for (const auto & index : output_columns_indicies) { cols.push_back(block.getByPosition(index)); } output_header = DB::Block(cols); } } } void ShuffleSplitter::splitBlockByPartition(DB::Block & block) { Stopwatch split_time_watch; DB::Block out_block; for (size_t col = 0; col < output_header.columns(); ++col) { out_block.insert(block.getByPosition(output_columns_indicies[col])); } for (size_t col = 0; col < output_header.columns(); ++col) { for (size_t j = 0; j < partition_info.partition_num; ++j) { size_t from = partition_info.partition_start_points[j]; size_t length = partition_info.partition_start_points[j + 1] - from; if (length == 0) continue; // no data for this partition continue; partition_buffer[j]->appendSelective(col, out_block, partition_info.partition_selector, from, length); } } split_result.total_split_time += split_time_watch.elapsedNanoseconds(); for (size_t i = 0; i < options.partition_num; ++i) { auto & buffer = partition_buffer[i]; if (buffer->size() >= options.split_size) { spillPartition(i); } } } ShuffleSplitter::ShuffleSplitter(const SplitOptions & options_) : options(options_) { init(); } void ShuffleSplitter::init() { partition_buffer.resize(options.partition_num); partition_outputs.resize(options.partition_num); partition_write_buffers.resize(options.partition_num); partition_cached_write_buffers.resize(options.partition_num); split_result.partition_lengths.resize(options.partition_num); split_result.raw_partition_lengths.resize(options.partition_num); for (size_t partition_i = 0; partition_i < options.partition_num; ++partition_i) { partition_buffer[partition_i] = std::make_shared<ColumnsBuffer>(options.split_size); split_result.partition_lengths[partition_i] = 0; split_result.raw_partition_lengths[partition_i] = 0; } } void ShuffleSplitter::spillPartition(size_t partition_id) { Stopwatch watch; if (!partition_outputs[partition_id]) { partition_write_buffers[partition_id] = getPartitionWriteBuffer(partition_id); partition_outputs[partition_id] = std::make_unique<NativeWriter>(*partition_write_buffers[partition_id], output_header); } DB::Block result = partition_buffer[partition_id]->releaseColumns(); if (result.rows() > 0) { partition_outputs[partition_id]->write(result); } split_result.total_spill_time += watch.elapsedNanoseconds(); split_result.total_bytes_spilled += result.bytes(); } void ShuffleSplitter::mergePartitionFiles() { Stopwatch merge_io_time; DB::WriteBufferFromFile data_write_buffer = DB::WriteBufferFromFile(options.data_file); std::string buffer; size_t buffer_size = options.io_buffer_size; buffer.reserve(buffer_size); for (size_t i = 0; i < options.partition_num; ++i) { auto file = getPartitionTempFile(i); DB::ReadBufferFromFile reader = DB::ReadBufferFromFile(file, options.io_buffer_size); while (reader.next()) { auto bytes = reader.readBig(buffer.data(), buffer_size); data_write_buffer.write(buffer.data(), bytes); split_result.partition_lengths[i] += bytes; split_result.total_bytes_written += bytes; } reader.close(); std::filesystem::remove(file); } split_result.total_io_time += merge_io_time.elapsedNanoseconds(); data_write_buffer.close(); } ShuffleSplitterPtr ShuffleSplitter::create(const std::string & short_name, const SplitOptions & options_) { if (short_name == "rr") return RoundRobinSplitter::create(options_); else if (short_name == "hash") return HashSplitter::create(options_); else if (short_name == "single") { SplitOptions options = options_; options.partition_num = 1; return RoundRobinSplitter::create(options); } else if (short_name == "range") return RangeSplitter::create(options_); else throw std::runtime_error("unsupported splitter " + short_name); } std::string ShuffleSplitter::getPartitionTempFile(size_t partition_id) { auto file_name = std::to_string(options.shuffle_id) + "_" + std::to_string(options.map_id) + "_" + std::to_string(partition_id); std::hash<std::string> hasher; auto hash = hasher(file_name); auto dir_id = hash % options.local_dirs_list.size(); auto sub_dir_id = (hash / options.local_dirs_list.size()) % options.num_sub_dirs; std::string dir = std::filesystem::path(options.local_dirs_list[dir_id]) / std::format("{:02x}", sub_dir_id); if (!std::filesystem::exists(dir)) std::filesystem::create_directories(dir); return std::filesystem::path(dir) / file_name; } std::unique_ptr<DB::WriteBuffer> ShuffleSplitter::getPartitionWriteBuffer(size_t partition_id) { auto file = getPartitionTempFile(partition_id); if (partition_cached_write_buffers[partition_id] == nullptr) partition_cached_write_buffers[partition_id] = std::make_unique<DB::WriteBufferFromFile>(file, options.io_buffer_size, O_CREAT | O_WRONLY | O_APPEND); if (!options.compress_method.empty() && std::find(compress_methods.begin(), compress_methods.end(), options.compress_method) != compress_methods.end()) { auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(options.compress_method), {}); auto compressed = std::make_unique<CompressedWriteBuffer>(*partition_cached_write_buffers[partition_id], codec); compressed_buffers.emplace_back(compressed.get()); return compressed; } else { return std::move(partition_cached_write_buffers[partition_id]); } } void ShuffleSplitter::writeIndexFile() { auto index_file = options.data_file + ".index"; auto writer = std::make_unique<DB::WriteBufferFromFile>(index_file, options.io_buffer_size, O_CREAT | O_WRONLY | O_TRUNC); for (auto len : split_result.partition_lengths) { DB::writeIntText(len, *writer); DB::writeChar('\n', *writer); } } void ColumnsBuffer::add(DB::Block & block, int start, int end) { if (!header) header = block.cloneEmpty(); if (accumulated_columns.empty()) { accumulated_columns.reserve(block.columns()); for (size_t i = 0; i < block.columns(); i++) { auto column = block.getColumns()[i]->cloneEmpty(); column->reserve(prefer_buffer_size); accumulated_columns.emplace_back(std::move(column)); } } assert(!accumulated_columns.empty()); for (size_t i = 0; i < block.columns(); ++i) { if (!accumulated_columns[i]->onlyNull()) { accumulated_columns[i]->insertRangeFrom(*block.getByPosition(i).column, start, end - start); } else { accumulated_columns[i]->insertMany(DB::Field(), end - start); } } } void ColumnsBuffer::appendSelective( size_t column_idx, const DB::Block & source, const DB::IColumn::Selector & selector, size_t from, size_t length) { if (!header) header = source.cloneEmpty(); if (accumulated_columns.empty()) { accumulated_columns.reserve(source.columns()); for (size_t i = 0; i < source.columns(); i++) { auto column = source.getColumns()[i]->convertToFullColumnIfConst()->convertToFullColumnIfSparse()->cloneEmpty(); column->reserve(prefer_buffer_size); accumulated_columns.emplace_back(std::move(column)); } } if (!accumulated_columns[column_idx]->onlyNull()) { accumulated_columns[column_idx]->insertRangeSelective( *source.getByPosition(column_idx).column->convertToFullColumnIfConst()->convertToFullColumnIfSparse(), selector, from, length); } else { accumulated_columns[column_idx]->insertMany(DB::Field(), length); } } size_t ColumnsBuffer::size() const { return accumulated_columns.empty() ? 0 : accumulated_columns[0]->size(); } bool ColumnsBuffer::empty() const { return accumulated_columns.empty() ? true : accumulated_columns[0]->empty(); } DB::Block ColumnsBuffer::releaseColumns() { DB::Columns columns(std::make_move_iterator(accumulated_columns.begin()), std::make_move_iterator(accumulated_columns.end())); accumulated_columns.clear(); if (columns.empty()) return header.cloneEmpty(); else return header.cloneWithColumns(columns); } DB::Block ColumnsBuffer::getHeader() { return header; } ColumnsBuffer::ColumnsBuffer(size_t prefer_buffer_size_) : prefer_buffer_size(prefer_buffer_size_) { } RoundRobinSplitter::RoundRobinSplitter(const SplitOptions & options_) : ShuffleSplitter(options_) { Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ","); for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter) { output_columns_indicies.push_back(std::stoi(*iter)); } selector_builder = std::make_unique<RoundRobinSelectorBuilder>(options.partition_num); } void RoundRobinSplitter::computeAndCountPartitionId(DB::Block & block) { Stopwatch watch; partition_info = selector_builder->build(block); split_result.total_compute_pid_time += watch.elapsedNanoseconds(); } ShuffleSplitterPtr RoundRobinSplitter::create(const SplitOptions & options_) { return std::make_unique<RoundRobinSplitter>(options_); } HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(options_) { Poco::StringTokenizer exprs_list(options_.hash_exprs, ","); std::vector<size_t> hash_fields; for (auto iter = exprs_list.begin(); iter != exprs_list.end(); ++iter) { hash_fields.push_back(std::stoi(*iter)); } Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ","); for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter) { output_columns_indicies.push_back(std::stoi(*iter)); } selector_builder = std::make_unique<HashSelectorBuilder>(options.partition_num, hash_fields, options_.hash_algorithm); } std::unique_ptr<ShuffleSplitter> HashSplitter::create(const SplitOptions & options_) { return std::make_unique<HashSplitter>(options_); } void HashSplitter::computeAndCountPartitionId(DB::Block & block) { Stopwatch watch; partition_info = selector_builder->build(block); split_result.total_compute_pid_time += watch.elapsedNanoseconds(); } ShuffleSplitterPtr RangeSplitter::create(const SplitOptions & options_) { return std::make_unique<RangeSplitter>(options_); } RangeSplitter::RangeSplitter(const SplitOptions & options_) : ShuffleSplitter(options_) { Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ","); for (auto iter = output_column_tokenizer.begin(); iter != output_column_tokenizer.end(); ++iter) { output_columns_indicies.push_back(std::stoi(*iter)); } selector_builder = std::make_unique<RangeSelectorBuilder>(options.hash_exprs, options.partition_num); } void RangeSplitter::computeAndCountPartitionId(DB::Block & block) { Stopwatch watch; partition_info = selector_builder->build(block); split_result.total_compute_pid_time += watch.elapsedNanoseconds(); } }