http/get_compressed/python/client/client.py (64 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import urllib.request import pyarrow as pa import time URI = "http://localhost:8008" ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" def make_request(uri, compression): coding = "identity" if compression.startswith("identity") else compression # urllib.request.urlopen() always sends an HTTP/1.1 request # with Accept-Encoding: identity, so we need to setup a request # object with custom headers to request a specific compression headers = { "Accept-Encoding": f"{coding}, *;q=0", } if compression.startswith("identity+"): # request IPC buffer compression instead of HTTP compression ipc_codec = compression.split("+")[1] headers["Accept"] = f'{ARROW_STREAM_FORMAT};codecs="{ipc_codec}"' request = urllib.request.Request(uri, headers=headers) response = urllib.request.urlopen(request) content_type = response.headers["Content-Type"] if not content_type.startswith(ARROW_STREAM_FORMAT): raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}") if compression.startswith("identity"): return response # IANA nomenclature for Brotli is "br" and not "brotli" compression = "brotli" if compression == "br" else compression return pa.CompressedInputStream(response, compression) def request_and_process(uri, compression): batches = [] log_prefix = f"{'[' + compression + ']':>10}:" print( f"{log_prefix} Requesting data from {uri} with `{compression}` compression strategy." ) start_time = time.time() response = make_request(uri, compression) with pa.ipc.open_stream(response) as reader: schema = reader.schema time_to_schema = time.time() - start_time try: batch = reader.read_next_batch() time_to_first_batch = time.time() - start_time batches.append(batch) while True: batch = reader.read_next_batch() batches.append(batch) except StopIteration: pass processing_time = time.time() - start_time reader_stats = reader.stats print( f"{log_prefix} Schema received in {time_to_schema:.3f} seconds." f" schema=({', '.join(schema.names)})." ) print( f"{log_prefix} First batch received and processed in" f" {time_to_first_batch:.3f} seconds" ) print( f"{log_prefix} Processing of all batches completed in" f" {processing_time:.3f} seconds." ) print(f"{log_prefix}", reader_stats) return batches # HTTP compression request_and_process(URI, "identity") request_and_process(URI, "zstd") request_and_process(URI, "br") request_and_process(URI, "gzip") # using IPC buffer compression instead of HTTP compression request_and_process(URI, "identity+zstd") request_and_process(URI, "identity+lz4")