cpp/core/benchmarks/CompressionBenchmark.cc (388 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <arrow/extension_type.h> #include <arrow/filesystem/filesystem.h> #include <arrow/io/interfaces.h> #include <arrow/ipc/options.h> #include <arrow/memory_pool.h> #include <arrow/record_batch.h> #include <arrow/type.h> #include <arrow/type_fwd.h> #include <benchmark/benchmark.h> #include <execinfo.h> #include <glog/logging.h> #include <parquet/arrow/reader.h> #include <parquet/file_reader.h> #include <sched.h> #include <chrono> #include <iostream> #include <utility> #include "shuffle/ShuffleWriter.h" #include "utils/Compression.h" #include "utils/macros.h" void printTrace(void) { char** strings; size_t i, size; enum Constexpr { kMaxSize = 1024 }; void* array[kMaxSize]; size = backtrace(array, kMaxSize); strings = backtrace_symbols(array, size); for (i = 0; i < size; i++) printf(" %s\n", strings[i]); puts(""); free(strings); } using arrow::RecordBatchReader; using arrow::Status; using gluten::GlutenException; using gluten::ShuffleWriterOptions; namespace gluten { #define ALIGNMENT 2 * 1024 * 1024 const int32_t kQatGzip = 0; const int32_t kQatZstd = 1; const int32_t kQplGzip = 2; const int32_t kLZ4 = 3; const int32_t kZstd = 4; class BenchmarkCompression { public: explicit BenchmarkCompression(const std::string& fileName, uint32_t compressBufferSize) { getRecordBatchReader(fileName, compressBufferSize); } virtual std::string name() const = 0; void getRecordBatchReader(const std::string& inputFile, uint32_t compressBufferSize) { std::unique_ptr<::parquet::arrow::FileReader> parquetReader; std::shared_ptr<RecordBatchReader> recordBatchReader; std::shared_ptr<arrow::fs::FileSystem> fs; std::string fileName; GLUTEN_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(inputFile, &fileName)) GLUTEN_ASSIGN_OR_THROW(file_, fs->OpenInputFile(fileName)); properties_.set_batch_size(compressBufferSize); properties_.set_pre_buffer(false); properties_.set_use_threads(false); GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make( arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader)); GLUTEN_THROW_NOT_OK(parquetReader->GetSchema(&schema_)); auto numRowgroups = parquetReader->num_row_groups(); for (int i = 0; i < numRowgroups; ++i) { rowGroupIndices_.push_back(i); } auto numColumns = schema_->num_fields(); for (int i = 0; i < numColumns; ++i) { columnIndices_.push_back(i); } } void operator()(benchmark::State& state) { setCpu(state.range(2) + state.thread_index()); auto ipcWriteOptions = arrow::ipc::IpcWriteOptions::Defaults(); ipcWriteOptions.use_threads = false; auto compressBufferSize = (uint32_t)state.range(1); auto compressionType = state.range(0); switch (compressionType) { case gluten::kLZ4: { ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::LZ4_FRAME, CodecBackend::NONE); break; } case gluten::kZstd: { ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD, CodecBackend::NONE); break; } #ifdef GLUTEN_ENABLE_QAT case gluten::kQatGzip: { ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::GZIP, CodecBackend::QAT); break; } case gluten::kQatZstd: { ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD, CodecBackend::QAT); LOG(INFO) << "load qatzstd"; break; } #endif #ifdef GLUTEN_ENABLE_IAA case gluten::kQplGzip: { ipcWriteOptions.codec = createArrowIpcCodec(arrow::Compression::ZSTD, CodecBackend::IAA); break; } #endif default: throw GlutenException("Codec not supported. Only support LZ4 or QATGzip"); } ipcWriteOptions.memory_pool = arrow::default_memory_pool(); int64_t elapseRead = 0; int64_t numBatches = 0; int64_t numRows = 0; int64_t compressTime = 0; int64_t decompressTime = 0; int64_t uncompressedSize = 0; int64_t compressedSize = 0; auto startTime = std::chrono::steady_clock::now(); doCompress( elapseRead, numBatches, numRows, compressTime, decompressTime, uncompressedSize, compressedSize, ipcWriteOptions, state); auto endTime = std::chrono::steady_clock::now(); auto totalTime = (endTime - startTime).count(); LOG(INFO) << "Thread " << state.thread_index() << " took " << (1.0 * totalTime / 1e9) << "s"; state.counters["rowgroups"] = benchmark::Counter(rowGroupIndices_.size(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["columns"] = benchmark::Counter(columnIndices_.size(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["batches"] = benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["num_rows"] = benchmark::Counter(numRows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["batch_buffer_size"] = benchmark::Counter(compressBufferSize, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); state.counters["parquet_parse"] = benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["compress_time"] = benchmark::Counter(compressTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["decompress_time"] = benchmark::Counter(decompressTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["total_time"] = benchmark::Counter(totalTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); state.counters["uncompressed_size"] = benchmark::Counter(uncompressedSize, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); state.counters["compressed_size"] = benchmark::Counter(compressedSize, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); auto compressionRatio = 1.0 * compressedSize / uncompressedSize; state.counters["compression_ratio"] = benchmark::Counter(compressionRatio, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); // compress_time is in nanosecond, zoom out to second. auto throughput = 1.0 * uncompressedSize / compressTime * 1e9 * 8; state.counters["throughput_total"] = benchmark::Counter(throughput, benchmark::Counter::kDefaults, benchmark::Counter::OneK::kIs1024); state.counters["throughput_per_thread"] = benchmark::Counter(throughput, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); } protected: long setCpu(uint32_t cpuindex) { cpu_set_t cs; CPU_ZERO(&cs); CPU_SET(cpuindex, &cs); return sched_setaffinity(0, sizeof(cs), &cs); } virtual void doCompress( int64_t& elapseRead, int64_t& numBatches, int64_t& numRows, int64_t& compressTime, int64_t& decompressTime, int64_t& uncompressedSize, int64_t& compressedSize, arrow::ipc::IpcWriteOptions& ipcWriteOptions, benchmark::State& state) {} void decompress( const arrow::ipc::IpcWriteOptions& ipcWriteOptions, const std::vector<std::shared_ptr<arrow::ipc::IpcPayload>>& payloads, const std::vector<std::vector<int64_t>>& uncompressedBufferSize, int64_t& decompressTime) { TIME_NANO_START(decompressTime); auto codec = ipcWriteOptions.codec; for (auto i = 0; i < payloads.size(); ++i) { auto& buffers = payloads[i]->body_buffers; for (auto j = 0; j < buffers.size(); ++j) { auto outputSize = uncompressedBufferSize[i][j]; if (buffers[j] && outputSize >= 0) { GLUTEN_ASSIGN_OR_THROW(auto out, arrow::AllocateResizableBuffer(outputSize, ipcWriteOptions.memory_pool)); // Exclude the first 8-byte buffer metadata. GLUTEN_ASSIGN_OR_THROW( auto len, codec->Decompress(buffers[j]->size() - 8, buffers[j]->data() + 8, outputSize, out->mutable_data())); (void)len; } } } TIME_NANO_END(decompressTime); } protected: std::shared_ptr<arrow::io::RandomAccessFile> file_; std::vector<int> rowGroupIndices_; std::vector<int> columnIndices_; std::shared_ptr<arrow::Schema> schema_; parquet::ArrowReaderProperties properties_; }; class BenchmarkCompressionCacheScanBenchmark final : public BenchmarkCompression { public: explicit BenchmarkCompressionCacheScanBenchmark(const std::string& filename, uint32_t compressBufferSize) : BenchmarkCompression(filename, compressBufferSize) {} std::string name() const override { return "CacheScan"; } protected: void doCompress( int64_t& elapseRead, int64_t& numBatches, int64_t& numRows, int64_t& compressTime, int64_t& decompressTime, int64_t& uncompressedSize, int64_t& compressedSize, arrow::ipc::IpcWriteOptions& ipcWriteOptions, benchmark::State& state) override { std::shared_ptr<arrow::RecordBatch> recordBatch; std::unique_ptr<::parquet::arrow::FileReader> parquetReader; std::shared_ptr<RecordBatchReader> recordBatchReader; GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make( arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader)); std::vector<std::shared_ptr<arrow::RecordBatch>> batches; GLUTEN_THROW_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, columnIndices_, &recordBatchReader)); do { TIME_NANO_OR_THROW(elapseRead, recordBatchReader->ReadNext(&recordBatch)); if (recordBatch) { batches.push_back(recordBatch); numBatches += 1; numRows += recordBatch->num_rows(); } } while (recordBatch); LOG(INFO) << "parquet parse done elapsed time " << elapseRead / 1e6 << " ms "; LOG(INFO) << "batches = " << numBatches << " rows = " << numRows; std::vector<std::shared_ptr<arrow::ipc::IpcPayload>> payloads(batches.size()); std::vector<std::vector<int64_t>> uncompressedBufferSize(batches.size()); for (auto _ : state) { auto it = batches.begin(); auto pos = 0; while (it != batches.end()) { recordBatch = *it++; for (auto i = 0; i < recordBatch->num_columns(); ++i) { recordBatch->column(i)->data()->buffers[0] = nullptr; for (auto& buffer : recordBatch->column(i)->data()->buffers) { if (buffer) { uncompressedBufferSize[pos].push_back(buffer->size()); } else { uncompressedBufferSize[pos].push_back(-1); } } } auto payload = std::make_shared<arrow::ipc::IpcPayload>(); TIME_NANO_OR_THROW( compressTime, arrow::ipc::GetRecordBatchPayload(*recordBatch, ipcWriteOptions, payload.get())); uncompressedSize += payload->raw_body_length; compressedSize += payload->body_length; TIME_NANO_OR_THROW(elapseRead, recordBatchReader->ReadNext(&recordBatch)); payloads[pos] = std::move(payload); pos++; } decompress(ipcWriteOptions, payloads, uncompressedBufferSize, decompressTime); } } }; class BenchmarkCompressionIterateScanBenchmark final : public BenchmarkCompression { public: explicit BenchmarkCompressionIterateScanBenchmark(const std::string& filename, uint32_t compressBufferSize) : BenchmarkCompression(filename, compressBufferSize) {} std::string name() const override { return "IterateScan"; } protected: void doCompress( int64_t& elapseRead, int64_t& numBatches, int64_t& numRows, int64_t& compressTime, int64_t& decompressTime, int64_t& uncompressedSize, int64_t& compressedSize, arrow::ipc::IpcWriteOptions& ipcWriteOptions, benchmark::State& state) override { std::shared_ptr<arrow::RecordBatch> recordBatch; std::unique_ptr<::parquet::arrow::FileReader> parquetReader; std::shared_ptr<RecordBatchReader> recordBatchReader; GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make( arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader)); for (auto _ : state) { GLUTEN_THROW_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, columnIndices_, &recordBatchReader)); TIME_NANO_OR_THROW(elapseRead, recordBatchReader->ReadNext(&recordBatch)); std::vector<std::shared_ptr<arrow::ipc::IpcPayload>> payloads; std::vector<std::vector<int64_t>> uncompressedBufferSize; while (recordBatch) { numBatches += 1; uncompressedBufferSize.resize(numBatches); numRows += recordBatch->num_rows(); for (auto i = 0; i < recordBatch->num_columns(); ++i) { recordBatch->column(i)->data()->buffers[0] = nullptr; for (auto& buffer : recordBatch->column(i)->data()->buffers) { if (buffer) { uncompressedBufferSize.back().push_back(buffer->size()); } else { uncompressedBufferSize.back().push_back(-1); } } } auto payload = std::make_shared<arrow::ipc::IpcPayload>(); TIME_NANO_OR_THROW( compressTime, arrow::ipc::GetRecordBatchPayload(*recordBatch, ipcWriteOptions, payload.get())); uncompressedSize += payload->raw_body_length; compressedSize += payload->body_length; TIME_NANO_OR_THROW(elapseRead, recordBatchReader->ReadNext(&recordBatch)); payloads.push_back(std::move(payload)); } decompress(ipcWriteOptions, payloads, uncompressedBufferSize, decompressTime); } } }; } // namespace gluten int main(int argc, char** argv) { uint32_t iterations = 1; uint32_t threads = 1; uint32_t cpuOffset = 0; std::string datafile; auto codec = gluten::kLZ4; uint32_t compressBufferSize = 4096; for (int i = 0; i < argc; i++) { if (strcmp(argv[i], "--iterations") == 0) { iterations = atol(argv[i + 1]); } else if (strcmp(argv[i], "--threads") == 0) { threads = atol(argv[i + 1]); } else if (strcmp(argv[i], "--file") == 0) { datafile = argv[i + 1]; } else if (strcmp(argv[i], "--qat-gzip") == 0) { LOG(INFO) << "QAT gzip is used as codec"; codec = gluten::kQatGzip; } else if (strcmp(argv[i], "--qat-zstd") == 0) { LOG(INFO) << "QAT zstd is used as codec"; codec = gluten::kQatZstd; } else if (strcmp(argv[i], "--qpl-gzip") == 0) { LOG(INFO) << "QPL gzip is used as codec"; codec = gluten::kQplGzip; } else if (strcmp(argv[i], "--zstd") == 0) { LOG(INFO) << "CPU zstd is used as codec"; codec = gluten::kZstd; } else if (strcmp(argv[i], "--buffer-size") == 0) { compressBufferSize = atol(argv[i + 1]); } else if (strcmp(argv[i], "--cpu-offset") == 0) { cpuOffset = atol(argv[i + 1]); } } LOG(INFO) << "iterations = " << iterations; LOG(INFO) << "threads = " << threads; LOG(INFO) << "datafile = " << datafile; gluten::BenchmarkCompressionIterateScanBenchmark bmIterateScan(datafile, compressBufferSize); gluten::BenchmarkCompressionCacheScanBenchmark bmCacheScan(datafile, compressBufferSize); benchmark::RegisterBenchmark(bmIterateScan.name().c_str(), bmIterateScan) ->Iterations(iterations) ->Args({ codec, compressBufferSize, cpuOffset, }) ->Threads(threads) ->ReportAggregatesOnly(false) ->MeasureProcessCPUTime() ->Unit(benchmark::kSecond); benchmark::RegisterBenchmark(bmCacheScan.name().c_str(), bmCacheScan) ->Iterations(iterations) ->Args({ codec, compressBufferSize, cpuOffset, }) ->Threads(threads) ->ReportAggregatesOnly(false) ->MeasureProcessCPUTime() ->Unit(benchmark::kSecond); benchmark::Initialize(&argc, argv); benchmark::RunSpecifiedBenchmarks(); benchmark::Shutdown(); }