cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h (85 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <Columns/IColumn.h> #include <Core/Block.h> #include <Core/ColumnsWithTypeAndName.h> #include <Core/Field.h> #include <Interpreters/Context.h> #include <Processors/Chunk.h> #include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/ISource.h> #include <QueryPipeline/QueryPipeline.h> #include <Storages/SubstraitSource/FormatFile.h> #include <Storages/SubstraitSource/ReadBufferBuilder.h> #include <Storages/SubstraitSource/SubstraitFileSourceStep.h> #include <base/types.h> namespace local_engine { class FileReaderWrapper { public: explicit FileReaderWrapper(FormatFilePtr file_) : file(file_) { } virtual ~FileReaderWrapper() = default; virtual bool pull(DB::Chunk & chunk) = 0; virtual void applyKeyCondition(std::shared_ptr<const DB::KeyCondition> /*key_condition*/) { } protected: FormatFilePtr file; static DB::ColumnPtr createConstColumn(DB::DataTypePtr type, const DB::Field & field, size_t rows); static DB::ColumnPtr createColumn(const String & value, DB::DataTypePtr type, size_t rows); static DB::Field buildFieldFromString(const String & value, DB::DataTypePtr type); }; class NormalFileReader : public FileReaderWrapper { public: NormalFileReader(FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & to_read_header_, const DB::Block & output_header_); ~NormalFileReader() override = default; bool pull(DB::Chunk & chunk) override; void applyKeyCondition(std::shared_ptr<const DB::KeyCondition> key_condition) override { input_format->input->setKeyCondition(key_condition); } private: DB::ContextPtr context; DB::Block to_read_header; DB::Block output_header; FormatFile::InputFormatPtr input_format; }; class EmptyFileReader : public FileReaderWrapper { public: explicit EmptyFileReader(FormatFilePtr file_) : FileReaderWrapper(file_) { } ~EmptyFileReader() override = default; bool pull(DB::Chunk &) override { return false; } }; class ConstColumnsFileReader : public FileReaderWrapper { public: ConstColumnsFileReader( FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & header_, size_t block_size_ = DB::DEFAULT_BLOCK_SIZE); ~ConstColumnsFileReader() override = default; bool pull(DB::Chunk & chunk) override; private: DB::ContextPtr context; DB::Block header; size_t remained_rows; size_t block_size; }; class SubstraitFileSource : public DB::SourceWithKeyCondition { public: SubstraitFileSource(DB::ContextPtr context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos); ~SubstraitFileSource() override = default; String getName() const override { return "SubstraitFileSource"; } void setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs & nodes, DB::ContextPtr context_) override; protected: DB::Chunk generate() override; private: DB::ContextPtr context; DB::Block output_header; /// Sample header may contains partitions keys DB::Block to_read_header; // Sample header not include partition keys FormatFiles files; UInt32 current_file_index = 0; std::unique_ptr<FileReaderWrapper> file_reader; ReadBufferBuilderPtr read_buffer_builder; bool tryPrepareReader(); }; }