cpp/velox/memory/VeloxColumnarBatch.cc (135 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "VeloxColumnarBatch.h" #include "compute/VeloxRuntime.h" #include "utils/Timer.h" #include "utils/VeloxArrowUtils.h" #include "velox/row/UnsafeRowFast.h" #include "velox/type/Type.h" #include "velox/vector/FlatVector.h" namespace gluten { using namespace facebook; using namespace facebook::velox; namespace { RowVectorPtr makeRowVector( velox::memory::MemoryPool* pool, int32_t numRows, std::vector<std::string> childNames, BufferPtr nulls, const std::vector<VectorPtr>& children) { std::vector<std::shared_ptr<const Type>> childTypes; childTypes.resize(children.size()); for (int i = 0; i < children.size(); i++) { childTypes[i] = children[i]->type(); } auto rowType = ROW(std::move(childNames), std::move(childTypes)); return std::make_shared<RowVector>(pool, rowType, nulls, numRows, std::move(children)); } } // namespace void VeloxColumnarBatch::ensureFlattened() { if (flattened_) { return; } ScopedTimer timer(&exportNanos_); for (auto& child : rowVector_->children()) { facebook::velox::BaseVector::flattenVector(child); if (child->isLazy()) { child = child->as<facebook::velox::LazyVector>()->loadedVectorShared(); VELOX_DCHECK_NOT_NULL(child); } // In case of output from Limit, RowVector size can be smaller than its children size. if (child->size() > rowVector_->size()) { child = child->slice(0, rowVector_->size()); } } flattened_ = true; } std::shared_ptr<ArrowSchema> VeloxColumnarBatch::exportArrowSchema() { auto out = std::make_shared<ArrowSchema>(); ensureFlattened(); velox::exportToArrow(rowVector_, *out, ArrowUtils::getBridgeOptions()); return out; } std::shared_ptr<ArrowArray> VeloxColumnarBatch::exportArrowArray() { auto out = std::make_shared<ArrowArray>(); ensureFlattened(); velox::exportToArrow(rowVector_, *out, rowVector_->pool(), ArrowUtils::getBridgeOptions()); return out; } int64_t VeloxColumnarBatch::numBytes() { ensureFlattened(); return rowVector_->estimateFlatSize(); } velox::RowVectorPtr VeloxColumnarBatch::getRowVector() const { return rowVector_; } velox::RowVectorPtr VeloxColumnarBatch::getFlattenedRowVector() { ensureFlattened(); return rowVector_; } std::shared_ptr<VeloxColumnarBatch> VeloxColumnarBatch::from( facebook::velox::memory::MemoryPool* pool, std::shared_ptr<ColumnarBatch> cb) { if (cb->getType() == "velox") { return std::dynamic_pointer_cast<VeloxColumnarBatch>(cb); } auto vp = velox::importFromArrowAsOwner(*cb->exportArrowSchema(), *cb->exportArrowArray(), pool); return std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<velox::RowVector>(vp)); } std::shared_ptr<VeloxColumnarBatch> VeloxColumnarBatch::compose( facebook::velox::memory::MemoryPool* pool, const std::vector<std::shared_ptr<ColumnarBatch>>& batches) { GLUTEN_CHECK(!batches.empty(), "No batches to compose"); int32_t numRows = -1; for (const auto& batch : batches) { GLUTEN_CHECK(batch->getType() == kType, "At least one of the input batches is not in Velox format"); if (numRows == -1) { numRows = batch->numRows(); continue; } if (batch->numRows() != numRows) { throw GlutenException("Mismatched row counts among the input batches during composing columnar batches"); } auto vb = std::dynamic_pointer_cast<VeloxColumnarBatch>(batch); auto rv = vb->getRowVector(); GLUTEN_CHECK(rv->nulls() == nullptr, "Vectors to compose contain null bits"); } GLUTEN_CHECK(numRows > 0, "Illegal state"); std::vector<std::string> childNames; std::vector<VectorPtr> children; for (const auto& batch : batches) { auto vb = std::dynamic_pointer_cast<VeloxColumnarBatch>(batch); auto rv = vb->getRowVector(); for (const std::string& name : rv->type()->asRow().names()) { childNames.push_back(name); } for (const VectorPtr& vec : rv->children()) { children.push_back(vec); } } RowVectorPtr outRv = makeRowVector(pool, numRows, std::move(childNames), nullptr, std::move(children)); return std::make_shared<VeloxColumnarBatch>(outRv); } std::shared_ptr<VeloxColumnarBatch> VeloxColumnarBatch::select( facebook::velox::memory::MemoryPool* pool, const std::vector<int32_t>& columnIndices) { std::vector<std::string> childNames; std::vector<VectorPtr> childVectors; childNames.reserve(columnIndices.size()); childVectors.reserve(columnIndices.size()); auto type = facebook::velox::asRowType(rowVector_->type()); for (uint32_t i = 0; i < columnIndices.size(); i++) { auto index = columnIndices[i]; auto child = rowVector_->childAt(index); childNames.push_back(type->nameOf(index)); childVectors.push_back(child); } auto rowVector = makeRowVector(pool, numRows(), std::move(childNames), rowVector_->nulls(), std::move(childVectors)); return std::make_shared<VeloxColumnarBatch>(rowVector); } std::vector<char> VeloxColumnarBatch::toUnsafeRow(int32_t rowId) const { auto fast = std::make_unique<facebook::velox::row::UnsafeRowFast>(rowVector_); auto size = fast->rowSize(rowId); std::vector<char> bytes(size); std::memset(bytes.data(), 0, bytes.size()); fast->serialize(0, bytes.data()); return bytes; } } // namespace gluten