cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp (86 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SubstraitFileSource.h"
#include <Storages/Parquet/ColumnIndexFilter.h>
#include <Storages/SubstraitSource/FileReader.h>
#include <Storages/SubstraitSource/FormatFile.h>
#include <Poco/URI.h>
#include <Common/CHUtil.h>
namespace local_engine
{
static std::vector<FormatFilePtr> initializeFiles(const substrait::ReadRel::LocalFiles & file_infos, const DB::ContextPtr & context)
{
if (file_infos.items().empty())
return {};
std::vector<FormatFilePtr> files;
const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
ReadBufferBuilderPtr read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context);
for (const auto & item : file_infos.items())
files.emplace_back(FormatFileUtil::createFile(context, read_buffer_builder, item));
return files;
}
static DB::Block initReadHeader(const DB::Block & block, const FormatFiles & files)
{
if (files.empty())
return block;
const auto & partitions = files[0]->getFilePartitionValues();
const auto & fileMetaColumns = files[0]->fileMetaColumns();
DB::ColumnsWithTypeAndName result_columns;
std::ranges::copy_if(
block.getColumnsWithTypeAndName(),
std::back_inserter(result_columns),
[&partitions, &fileMetaColumns](const auto & column)
{ return !partitions.contains(column.name) && !fileMetaColumns.virtualColumn(column.name); });
return result_columns;
}
SubstraitFileSource::SubstraitFileSource(
const DB::ContextPtr & context_, const DB::Block & outputHeader_, const substrait::ReadRel::LocalFiles & file_infos)
: DB::SourceWithKeyCondition(BaseReader::buildRowCountHeader(outputHeader_), false)
, files(initializeFiles(file_infos, context_))
, outputHeader(outputHeader_)
, readHeader(initReadHeader(outputHeader, files))
{
}
SubstraitFileSource::~SubstraitFileSource() = default;
void SubstraitFileSource::setKeyCondition(const std::optional<DB::ActionsDAG> & filter_actions_dag, DB::ContextPtr context_)
{
setKeyConditionImpl(filter_actions_dag, context_, readHeader);
if (filter_actions_dag)
column_index_filter = std::make_shared<ColumnIndexFilter>(filter_actions_dag.value(), context_);
}
DB::Chunk SubstraitFileSource::generate()
{
while (true)
{
if (!tryPrepareReader())
{
/// all files finished
return {};
}
DB::Chunk chunk;
if (file_reader->pull(chunk))
return chunk;
/// try to read from next file
file_reader.reset();
}
}
bool SubstraitFileSource::tryPrepareReader()
{
if (isCancelled())
return false;
if (file_reader)
return true;
while (current_file_index < files.size())
{
auto current_file = files[current_file_index];
current_file_index += 1;
/// For the files do not support split strategy, the task with not 0 offset will generate empty data
if (!current_file->supportSplit() && current_file->getStartOffset())
continue;
file_reader = BaseReader::create(current_file, readHeader, outputHeader, key_condition, column_index_filter);
if (file_reader)
return true;
}
return false;
}
void SubstraitFileSource::onCancel() noexcept
{
if (file_reader)
file_reader->cancel();
}
}