http/get_multipart/python/client/simple_client.py (99 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from email import policy import email import json import pyarrow as pa import sys import time import urllib.request JSON_FORMAT = "application/json" TEXT_FORMAT = "text/plain" ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" start_time = time.time() response_parsing_time = 0 # time to parse the multipart message arrow_stream_parsing_time = 0 # time to parse the Arrow stream def parse_multipart_message(response, boundary, buffer_size=8192): """ Parse a multipart/mixed HTTP response into a list of Message objects. Returns ------- list of email.message.Message containing the parts of the multipart message. """ global response_parsing_time buffer_size = max(buffer_size, 8192) buffer = bytearray(buffer_size) header = f'MIME-Version: 1.0\r\nContent-Type: multipart/mixed; boundary="{boundary}"\r\n\r\n' feedparser = email.parser.BytesFeedParser(policy=policy.default) feedparser.feed(header.encode("utf-8")) while bytes_read := response.readinto(buffer): start_time = time.time() feedparser.feed(buffer[0:bytes_read]) response_parsing_time += time.time() - start_time start_time = time.time() message = feedparser.close() response_parsing_time += time.time() - start_time assert message.is_multipart() return message.get_payload() def process_json_part(message): assert message.get_content_type() == JSON_FORMAT payload = part.get_payload() print(f"-- {len(payload)} bytes of JSON data:") try: PREVIW_SIZE = 5 data = json.loads(payload) print("[") for i in range(min(PREVIW_SIZE, len(data))): print(f" {data[i]}") if len(data) > PREVIW_SIZE: print(f" ...+{len(data) - PREVIW_SIZE} entries...") print("]") except json.JSONDecodeError as e: print(f"Error parsing JSON data: {e}\n", file=sys.stderr) return data def process_arrow_stream_message(message): global arrow_stream_parsing_time assert message.get_content_type() == ARROW_STREAM_FORMAT payload = part.get_payload(decode=True) print(f"-- {len(payload)} bytes of Arrow data:") num_batches = 0 num_records = 0 start_time = time.time() with pa.ipc.open_stream(payload) as reader: schema = reader.schema print(f"Schema: \n{schema}\n") try: while True: batch = reader.read_next_batch() num_batches += 1 num_records += batch.num_rows except StopIteration: pass arrow_stream_parsing_time = time.time() - start_time print(f"Parsed {num_records} records in {num_batches} batch(es)") def process_text_part(message): assert message.get_content_type() == TEXT_FORMAT payload = part.get_payload() print("-- Text Message:") print(payload, end="") print("-- End of Text Message --") response = urllib.request.urlopen("http://localhost:8008?include_footnotes") content_type = response.headers.get_content_type() if content_type != "multipart/mixed": raise ValueError(f"Expected multipart/mixed Content-Type, got {content_type}") boundary = response.headers.get_boundary() if boundary is None or len(boundary) == 0: raise ValueError("No multipart boundary found in Content-Type header") parts = parse_multipart_message(response, boundary, buffer_size=64 * 1024) batches = None for part in parts: content_type = part.get_content_type() if content_type == JSON_FORMAT: process_json_part(part) elif content_type == ARROW_STREAM_FORMAT: batches = process_arrow_stream_message(part) elif content_type == TEXT_FORMAT: process_text_part(part) end_time = time.time() execution_time = end_time - start_time rel_response_parsing_time = response_parsing_time / execution_time rel_arrow_stream_parsing_time = arrow_stream_parsing_time / execution_time print(f"{execution_time:.3f} seconds elapsed") print( f"""{response_parsing_time:.3f} seconds \ ({rel_response_parsing_time * 100:.2f}%) \ seconds parsing multipart/mixed response""" ) print( f"""{arrow_stream_parsing_time:.3f} seconds \ ({rel_arrow_stream_parsing_time * 100:.2f}%) \ seconds parsing Arrow stream""" )