cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h (76 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <Interpreters/Context.h> #include <Interpreters/SquashingTransform.h> #include <Storages/MergeTree/MergeTreeDataWriter.h> #include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Poco/StringTokenizer.h> namespace DB { struct BlockWithPartition; class MergeTreeData; struct StorageSnapshot; using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>; } namespace local_engine { struct PartInfo { String part_name; size_t mark_count; size_t disk_size; size_t row_count; std::unordered_map<String, String> partition_values; String bucket_id; }; class SparkMergeTreeWriter { public: static String partInfosToJson(const std::vector<PartInfo> & part_infos); SparkMergeTreeWriter( DB::MergeTreeData & storage_, const DB::StorageMetadataPtr & metadata_snapshot_, const DB::ContextPtr & context_, const String & uuid_, const String & partition_dir_ = "", const String & bucket_dir_ = "") : storage(storage_) , metadata_snapshot(metadata_snapshot_) , context(context_) , uuid(uuid_) , partition_dir(partition_dir_) , bucket_dir(bucket_dir_) { const DB::Settings & settings = context->getSettingsRef(); squashing_transform = std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); if (!partition_dir.empty()) { Poco::StringTokenizer partitions(partition_dir, "/"); for (const auto & partition : partitions) { Poco::StringTokenizer key_value(partition, "="); chassert(key_value.count() == 2); partition_values.emplace(key_value[0], key_value[1]); } } header = metadata_snapshot->getSampleBlock(); } void write(DB::Block & block); void finalize(); std::vector<PartInfo> getAllPartInfo(); private: DB::MergeTreeDataWriter::TemporaryPart writeTempPart(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot, DB::ContextPtr context); String uuid; String partition_dir; String bucket_dir; DB::MergeTreeData & storage; DB::StorageMetadataPtr metadata_snapshot; DB::ContextPtr context; std::unique_ptr<DB::SquashingTransform> squashing_transform; int part_num = 1; std::vector<DB::MergeTreeDataPartPtr> new_parts; std::unordered_map<String, String> partition_values; DB::Block header; }; }