benchmarks/perf_storage_api_arrow.py (120 lines of code) (raw):

# Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import sys import threading import time import pytest from odps.apis.storage_api.conftest import storage_api_client # noqa: F401 if sys.version_info[0] == 3: from odps.apis.storage_api import * else: pytestmark = pytest.mark.skip("Need python3.5+ to run this test") logger = logging.getLogger(__name__) lock = threading.Lock() global_total_record = 0 thread_num = 1 compress_type = None def read_rows(storage_api_client, compress_type, read_session_id, split_count): global global_total_record if compress_type == 0: compression = Compression.UNCOMPRESSED elif compress_type == 1: compression = Compression.ZSTD else: compression = Compression.LZ4_FRAME req = ReadRowsRequest(session_id=read_session_id, compression=compression) total_line = 0 for i in range(0, split_count): req.split_index = i start = time.time() reader = storage_api_client.read_rows_arrow(req) while True: record_batch = reader.read() if record_batch is None: break total_line += record_batch.num_rows with lock: global_total_record += record_batch.num_rows if reader.get_status() != Status.OK: logger.info("Read rows failed") return False end = time.time() logger.info("Read rows cost (index " + str(i) + "): " + str(end - start) + "s") def read_performance(storage_api_client): read_session_id, split_count = create_read_session(storage_api_client) assert read_session_id is not None assert get_read_session(storage_api_client, read_session_id) is True read_rows(storage_api_client, compress_type, read_session_id, split_count) def test_read_thread(storage_api_client): global table table = storage_api_client.table if table == "None": logger.info("No table name specified") raise ValueError("Please input table name") global global_total_record read_performance_threads = [] for i in range(0, thread_num): read_performance_thread = threading.Thread( target=read_performance, args=[storage_api_client], ) read_performance_threads.append(read_performance_thread) start = time.time() for i in range(0, thread_num): read_performance_threads[i].start() count = 0 start_count = 0 now_count = 0 cal_total_count = 0 cal_count = 0 judge = False while count < 20: time.sleep(1) now = time.time() now_count = global_total_record logger.info( "index: %d, read, %f records per second" % (count, (now_count - start_count) / (now - start)) ) if judge and cal_count < 5: cal_total_count += (now_count - start_count) / (now - start) cal_count += 1 if now_count - start_count > 0: judge = True start_count = now_count start = now count += 1 if cal_count == 5: logger.info("average count: %f" % (cal_total_count / 5.0)) else: logger.info("less than 5 valid result generated.") for i in range(0, thread_num): read_performance_threads[i].join() def create_read_session(storage_api_client): req = TableBatchScanRequest() req.required_partitions = ["pt=test_write_1"] try: resp = storage_api_client.create_read_session(req) except Exception as e: logger.info(e) return None, None if resp.status != Status.OK and resp.status != Status.WAIT: logger.info("create read session failed") return None, None return resp.session_id, resp.split_count def get_read_session(storage_api_client, read_session_id): req = SessionRequest(session_id=read_session_id) while True: try: resp = storage_api_client.get_read_session(req) except Exception as e: logger.info(e) return False if resp.status != Status.OK: logger.info("get read session failed") return False if resp.session_status == SessionStatus.INIT: logger.info("Wait...") time.sleep(1) continue break return True