cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp (198 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SparkMergeTreeWriter.h"
#include <Disks/createVolume.h>
#include <Interpreters/ActionsDAG.h>
#include <rapidjson/prettywriter.h>
using namespace DB;
namespace local_engine
{
Block removeColumnSuffix(const DB::Block & block)
{
ColumnsWithTypeAndName columns;
for (int i = 0; i < block.columns(); ++i)
{
auto name = block.getByPosition(i).name;
Poco::StringTokenizer splits(name, "#");
auto column = block.getByPosition(i);
column.name = splits[0];
columns.emplace_back(column);
}
return Block(columns);
}
void SparkMergeTreeWriter::write(DB::Block & block)
{
auto new_block = removeColumnSuffix(block);
auto converter = ActionsDAG::makeConvertingActions(new_block.getColumnsWithTypeAndName(), header.getColumnsWithTypeAndName(), DB::ActionsDAG::MatchColumnsMode::Position);;
if (converter)
{
ExpressionActions do_convert = ExpressionActions(converter);
do_convert.execute(new_block);
}
auto res = squashing_transform->add(new_block);
if (res)
{
auto blocks_with_partition = MergeTreeDataWriter::splitBlockIntoParts(res, 10, metadata_snapshot, context);
for (auto & item : blocks_with_partition)
{
auto temp_part = writeTempPart(item, metadata_snapshot, context);
temp_part.finalize();
new_parts.emplace_back(temp_part.part);
part_num++;
}
}
}
void SparkMergeTreeWriter::finalize()
{
auto block = squashing_transform->add({});
if (block.rows())
{
auto blocks_with_partition = MergeTreeDataWriter::splitBlockIntoParts(block, 10, metadata_snapshot, context);
for (auto & item : blocks_with_partition)
{
auto temp_part = writeTempPart(item, metadata_snapshot, context);
temp_part.finalize();
new_parts.emplace_back(temp_part.part);
}
}
}
MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart(
BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
MergeTreeDataWriter::TemporaryPart temp_part;
Block & block = block_with_partition.block;
auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
for (auto & column : columns)
if (column.type->hasDynamicSubcolumns())
column.type = block.getByName(column.name).type;
auto minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
minmax_idx->update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
MergeTreePartition partition(block_with_partition.partition);
MergeTreePartInfo new_part_info(partition.getID(metadata_snapshot->getPartitionKey().sample_block), 1, 1, 0);
std::string part_dir;
if (!partition_dir.empty() && !bucket_dir.empty())
{
part_dir = fmt::format("{}/{}/{}_{:03d}", partition_dir, bucket_dir, uuid, part_num);
}
else if (!partition_dir.empty())
{
part_dir = fmt::format("{}/{}_{:03d}", partition_dir, uuid, part_num);
}
else if (!bucket_dir.empty())
{
part_dir = fmt::format("{}/{}_{:03d}", bucket_dir, uuid, part_num);
}
else
{
part_dir = fmt::format("{}_{:03d}", uuid, part_num);
}
String part_name = part_dir;
temp_part.temporary_directory_lock = storage.getTemporaryPartDirectoryHolder(part_dir);
auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
storage.getSortingKeyAndSkipIndicesExpression(metadata_snapshot, indices)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
}
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
/// If optimize_on_insert is true, block may become empty after merge.
/// There is no need to create empty part.
if (expected_size == 0)
return temp_part;
VolumePtr volume = storage.getStoragePolicy()->getVolume(0);
VolumePtr data_part_volume = std::make_shared<SingleDiskVolume>(volume->getName(), volume->getDisk(), volume->max_data_part_size);
auto new_data_part = storage.getDataPartBuilder(part_name, data_part_volume, part_dir)
.withPartFormat(storage.choosePartFormat(expected_size, block.rows()))
.withPartInfo(new_part_info)
.build();
auto data_part_storage = new_data_part->getDataPartStoragePtr();
const auto & data_settings = storage.getSettings();
SerializationInfo::Settings settings{data_settings->ratio_of_defaults_for_sparse_serialization, true};
SerializationInfoByName infos(columns, settings);
infos.add(block);
new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
SyncGuardPtr sync_guard;
if (new_data_part->isStoredOnDisk())
{
/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->getDataPartStorage().getFullPath();
if (new_data_part->getDataPartStorage().exists())
{
// LOG_WARNING(log, "Removing old temporary directory {}", full_path);
data_part_storage->removeRecursive();
}
data_part_storage->createDirectories();
if (storage.getSettings()->fsync_part_directory)
{
const auto disk = data_part_volume->getDisk();
sync_guard = disk->getDirectorySyncGuard(full_path);
}
}
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
metadata_snapshot,
columns,
indices,
MergeTreeStatisticsFactory::instance().getMany(metadata_snapshot->getColumns()),
compression_codec,
context->getCurrentTransaction(),
false,
false,
context->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);
auto finalizer = out->finalizePartAsync(new_data_part, data_settings->fsync_after_insert, nullptr, nullptr);
temp_part.part = new_data_part;
temp_part.streams.emplace_back(MergeTreeDataWriter::TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
return temp_part;
}
std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo()
{
std::vector<PartInfo> res;
for (const MergeTreeDataPartPtr & part : new_parts)
res.emplace_back(PartInfo{part->name, part->getMarksCount(), part->getBytesOnDisk(), part->rows_count, partition_values, bucket_dir});
return res;
}
String SparkMergeTreeWriter::partInfosToJson(const std::vector<PartInfo> & part_infos)
{
rapidjson::StringBuffer result;
rapidjson::Writer<rapidjson::StringBuffer> writer(result);
writer.StartArray();
for (const auto & item : part_infos)
{
writer.StartObject();
writer.Key("part_name");
writer.String(item.part_name.c_str());
writer.Key("mark_count");
writer.Uint(item.mark_count);
writer.Key("disk_size");
writer.Uint(item.disk_size);
writer.Key("row_count");
writer.Uint(item.row_count);
writer.Key("bucket_id");
writer.String(item.bucket_id.c_str());
writer.Key("partition_values");
writer.StartObject();
for (const auto & key_value : item.partition_values)
{
writer.Key(key_value.first.c_str());
writer.String(key_value.second.c_str());
}
writer.EndObject();
writer.EndObject();
}
writer.EndArray();
return result.GetString();
}
}