cpp/src/reader/block/single_device_tsblock_reader.h (129 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef READER_SINGLE_DEVICE_TSBLOCK_READER_H
#define READER_SINGLE_DEVICE_TSBLOCK_READER_H
#include "common/tsblock/tsblock.h"
#include "reader/block/tsblock_reader.h"
#include "reader/filter/filter.h"
#include "reader/imeta_data_querier.h"
#include "reader/task/device_query_task.h"
namespace storage {
class DeviceQueryTask;
class MeasurementColumnContext;
class IdColumnContext;
class SingleDeviceTsBlockReader : public TsBlockReader {
public:
explicit SingleDeviceTsBlockReader(DeviceQueryTask* device_query_task,
uint32_t block_size,
IMetadataQuerier* metadata_querier,
TsFileIOReader* tsfile_io_reader,
Filter* time_filter,
Filter* field_filter);
~SingleDeviceTsBlockReader() { close(); }
int has_next(bool &has_next) override;
int next(common::TsBlock*& ret_block) override;
int init(DeviceQueryTask* device_query_task, uint32_t block_size,
Filter* time_filter, Filter* field_filter);
void close() override;
private:
int construct_column_context(const ITimeseriesIndex* time_series_index,
Filter* time_filter);
int fill_measurements(
std::vector<MeasurementColumnContext*>& column_contexts);
int fill_ids();
int advance_column(MeasurementColumnContext* column_context);
DeviceQueryTask* device_query_task_;
Filter* field_filter_;
uint32_t block_size_;
common::TsBlock* current_block_ = nullptr;
std::vector<common::ColAppender*> col_appenders_;
common::RowAppender* row_appender_;
common::TupleDesc tuple_desc_;
bool last_block_returned_ = true;
std::map<std::string, MeasurementColumnContext*> field_column_contexts_;
std::map<std::string, IdColumnContext> id_column_contexts_;
int64_t next_time_ = 0;
int64_t time_column_index_ = 0;
TsFileIOReader* tsfile_io_reader_;
common::PageArena pa_;
};
class MeasurementColumnContext {
public:
explicit MeasurementColumnContext(TsFileIOReader* tsfile_io_reader)
: tsfile_io_reader_(tsfile_io_reader) {}
virtual ~MeasurementColumnContext() = default;
virtual void fill_into(
std::vector<common::ColAppender*>& col_appenders) = 0;
virtual void remove_from(std::map<std::string, MeasurementColumnContext*>&
column_context_map) = 0;
virtual int get_next_tsblock(bool alloc_mem) = 0;
virtual int get_current_time(int64_t& time) = 0;
virtual int get_current_value(char*& value, uint32_t& len) = 0;
virtual int move_iter() = 0;
protected:
TsFileIOReader* tsfile_io_reader_;
TsFileSeriesScanIterator* ssi_ = nullptr;
common::TsBlock* tsblock_ = nullptr;
common::ColIterator* time_iter_ = nullptr;
common::ColIterator* value_iter_ = nullptr;
};
class SingleMeasurementColumnContext final : public MeasurementColumnContext {
public:
explicit SingleMeasurementColumnContext(TsFileIOReader* tsfile_io_reader)
: MeasurementColumnContext(tsfile_io_reader) {}
~SingleMeasurementColumnContext() override {
if (time_iter_) {
delete time_iter_;
time_iter_ = nullptr;
}
if (value_iter_) {
delete value_iter_;
value_iter_ = nullptr;
}
if (ssi_) {
ssi_->revert_tsblock();
}
tsfile_io_reader_->revert_ssi(ssi_);
ssi_ = nullptr;
}
void fill_into(std::vector<common::ColAppender*>& col_appenders) override;
void remove_from(std::map<std::string, MeasurementColumnContext*>&
column_context_map) override;
int init(DeviceQueryTask* device_query_task,
const ITimeseriesIndex* time_series_index, Filter* time_filter,
const std::vector<int32_t>& pos_in_result,
common::PageArena& pa);
int get_next_tsblock(bool alloc_mem) override;
int get_current_time(int64_t& time) override;
int get_current_value(char*& value, uint32_t& len) override;
int move_iter() override;
private:
std::string column_name_;
std::vector<int32_t> pos_in_result_;
};
class VectorMeasurementColumnContext final : public MeasurementColumnContext {
public:
explicit VectorMeasurementColumnContext(TsFileIOReader* tsfile_io_reader)
: MeasurementColumnContext(tsfile_io_reader) {}
void fill_into(std::vector<common::ColAppender*>& col_appenders) override;
void remove_from(std::map<std::string, MeasurementColumnContext*>&
column_context_map) override;
int init(DeviceQueryTask* device_query_task,
const ITimeseriesIndex* time_series_index, Filter* time_filter,
std::vector<std::vector<int32_t>>& pos_in_result,
common::PageArena& pa);
int get_next_tsblock(bool alloc_mem) override;
int get_current_time(int64_t& time) override;
int get_current_value(char*& value, uint32_t& len) override;
int move_iter() override;
private:
std::vector<std::vector<int32_t>> pos_in_result_;
};
class IdColumnContext {
public:
explicit IdColumnContext(const std::vector<int32_t>& pos_in_result,
int32_t pos_in_device_id)
: pos_in_result_(pos_in_result), pos_in_device_id_(pos_in_device_id) {}
const std::vector<int32_t> pos_in_result_;
const int32_t pos_in_device_id_;
};
} // namespace storage
#endif // READER_SINGLE_DEVICE_TSBLOCK_READER_H