http/get_multipart/python/server/server.py (177 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from random import choice, randint from http.server import BaseHTTPRequestHandler, HTTPServer import io import json import secrets import string import time import pyarrow as pa # configuration: use chunked transfer encoding for HTTP/1.1 responses? CHUNKED_ENCODING = True def random_string(alphabet, length): return "".join(choice(alphabet) for _ in range(length)) def random_name(initial): length = randint(3, 7) return initial + random_string(string.ascii_lowercase, length) def example_tickers(num_tickers): tickers = [] while len(tickers) < num_tickers: length = randint(3, 4) random_ticker = random_string(string.ascii_uppercase, length) if random_ticker not in tickers: tickers.append(random_ticker) return tickers def example_json_data(tickers): json_data = [] for ticker in tickers: description = "" for c in ticker: description = " ".join(random_name(c) for c in ticker) json_data.append( { "ticker": ticker, "description": description, } ) return json_data the_schema = pa.schema( [ ("ticker", pa.utf8()), ("price", pa.int64()), ("volume", pa.int64()), ] ) def example_batch(tickers, length): data = {"ticker": [], "price": [], "volume": []} for _ in range(length): data["ticker"].append(choice(tickers)) data["price"].append(randint(1, 1000) * 100) data["volume"].append(randint(1, 10000)) return pa.RecordBatch.from_pydict(data, the_schema) def example_batches(tickers): # these parameters are chosen to generate a response # of ~1 GB and chunks of ~140 KB. total_records = 42_000_000 batch_len = 6 * 1024 # all the batches sent are random slices of the larger base batch base_batch = example_batch(tickers, length=8 * batch_len) batches = [] records = 0 while records < total_records: length = min(batch_len, total_records - records) offset = randint(0, base_batch.num_rows - length - 1) batch = base_batch.slice(offset, length) batches.append(batch) records += length return batches # end of example data generation def random_multipart_boundary(): """ Generate a random boundary string for a multipart response. Uses a cryptographically secure random number generator to generate a random boundary string for a multipart response. The boundary string has enough entropy to make it impossible that it will be repeated in the response body. Use a new boundary string for each multipart response so that once the secret is revealed to the client, it won't be possible to exploit it to create a malicious response. """ # 28 bytes (224 bits) of entropy is enough to make a collision impossible. # See [1] for a mathematical discussion. # # The 28 bytes are encoded into URL-safe characters (alphanumeric, -, and _) # so the string ends up longer than 28 characters. RFC1341 [2] recommends a # maximum boundary length of 70 characters, so we're well within that limit. # # [1] https://preshing.com/20110504/hash-collision-probabilities/ # [2] https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html return secrets.token_urlsafe(28) def gen_arrow_multipart_buffers(boundary, schema, source, is_last_part=False): """ Generate buffers for the Arrow Stream part of a multipart response. That is, an HTTP response started with the header: Content-type: multipart/mixed; boundary=the_boundary_string The buffers, when taken together, will form the following structure: --the_boundary_string<CR><LF> Content-Type: application/vnd.apache.arrow.stream<CR><LF> <CR><LF> <Arrow Stream data> <CR><LF> If is_last_part is True, the boundary string will be appended with two hyphens at the end of the last buffer to indicate the end of the multipart response: --the_boundary_string--<CR><LF> """ with io.BytesIO() as sink, pa.ipc.new_stream(sink, schema) as writer: sink.write( f"--{boundary}\r\n" "Content-Type: application/vnd.apache.arrow.stream\r\n" "\r\n".encode("utf-8") ) for batch in source: writer.write_batch(batch) sink.truncate() with sink.getbuffer() as buffer: yield buffer sink.seek(0) writer.close() sink.write("\r\n".encode("utf-8")) if is_last_part: sink.write(f"--{boundary}--\r\n".encode("utf-8")) sink.truncate() with sink.getbuffer() as buffer: yield buffer def gen_json_multipart_buffers(boundary, json_data, is_last_part=False): """ Generate buffers for the JSON part of a multipart response. That is, an HTTP response started with the header: Content-type: multipart/mixed; boundary=the_boundary_string The buffer will have the following structure: --the_boundary_string<CR><LF> Content-Type: application/json<CR><LF> <CR><LF> <serialized JSON data> <CR><LF> If is_last_part is True, the boundary string will be appended with two hyphens at the end of the buffer to indicate the end of the multipart response: --the_boundary_string--<CR><LF> Allocation of a big string for the JSON data is avoided by appending the JSON data directly to the same output buffer. """ with io.BytesIO() as sink: with io.TextIOWrapper(sink, encoding="utf-8", write_through=True) as wrapper: wrapper.write(f"--{boundary}\r\n" "Content-Type: application/json\r\n\r\n") json.dump(json_data, wrapper) wrapper.write("\r\n") if is_last_part: wrapper.write(f"--{boundary}--\r\n") with sink.getbuffer() as buffer: yield buffer def multipart_buffer_from_string(boundary, content_type, text, is_last_part=False): close_delimiter = f"--{boundary}--\r\n" if is_last_part else "" return ( f"--{boundary}\r\n" f"Content-Type: {content_type}\r\n\r\n" f"{text}\r\n{close_delimiter}".encode("utf-8") ) class MyRequestHandler(BaseHTTPRequestHandler): """ Multipart response handler for a simple HTTP server. This HTTP request handler serves a multipart/mixed response containing a JSON data part, followed by an Arrow Stream part and an optional text footer as the last part. The Arrow data is randomly generated "trading data" with a schema consisting of a ticker, price (in cents), and volume. The JSON header contains all the tickers and their descriptions. This could be returned as an Arrow table as well, but to illustrate the use of multiple parts in a response, it is sent as JSON. To make things more... mixed, a third part is added to the response: a plaintext footer containing footnotes about the request. This part is optional and only included if the client requests it by sending a query parameter `include_footnotes`. """ _include_footnotes = False _start_arrow_stream_time = None _end_arrow_stream_time = None _number_of_arrow_data_chunks = 0 _bytes_sent_on_arrow_stream = 0 def _resolve_json_data_header(self): return the_json_data def _resolve_batches(self): return pa.RecordBatchReader.from_batches(the_schema, all_batches) def _build_footnotes(self): num_batches = len(all_batches) elapsed_time = self._end_arrow_stream_time - self._start_arrow_stream_time num_chunks = self._number_of_arrow_data_chunks avg_chunk_size = self._bytes_sent_on_arrow_stream / num_chunks text = ( f"Hello Client,\n\n{num_batches} Arrow batch(es) were sent in " f"{elapsed_time:.3f} seconds through {num_chunks} HTTP\nresponse chunks. " f"Average size of each chunk was {avg_chunk_size:.2f} bytes.\n" "\n--\nSincerely,\nThe Server\n" ) return text def _gen_buffers(self, boundary, json_header, schema, source): # JSON header yield from gen_json_multipart_buffers(boundary, json_header) # Arrow data is_last_part = not self._include_footnotes self._start_arrow_stream_time = time.time() for buffer in gen_arrow_multipart_buffers( boundary, schema, source, is_last_part=is_last_part ): self._number_of_arrow_data_chunks += 1 self._bytes_sent_on_arrow_stream += len(buffer) yield buffer self._end_arrow_stream_time = time.time() # Footnotes (optional) if self._include_footnotes: footnotes = self._build_footnotes() yield multipart_buffer_from_string( boundary, "text/plain", footnotes, is_last_part=True ) def do_GET(self): ### note: always use urlparse in your applications. self._include_footnotes = self.path.endswith("?include_footnotes") ### in a real application the data would be resolved from a database or ### another source like a file and error handling would be done here ### before the 200 OK response starts being sent to the client. json_data_header = self._resolve_json_data_header() source = self._resolve_batches() if self.request_version == "HTTP/1.0": self.protocol_version = "HTTP/1.0" chunked = False else: self.protocol_version = "HTTP/1.1" chunked = CHUNKED_ENCODING self.send_response(200) boundary = random_multipart_boundary() self.send_header("Content-Type", f"multipart/mixed; boundary={boundary}") ### set these headers if testing with a local browser-based client: # self.send_header('Access-Control-Allow-Origin', 'http://localhost:8008') # self.send_header('Access-Control-Allow-Methods', 'GET') # self.send_header('Access-Control-Allow-Headers', 'Content-Type') if chunked: self.send_header("Transfer-Encoding", "chunked") self.end_headers() for buffer in self._gen_buffers(boundary, json_data_header, the_schema, source): if chunked: self.wfile.write(f"{len(buffer):X}\r\n".encode("utf-8")) self.wfile.write(buffer) if chunked: self.wfile.write("\r\n".encode("utf-8")) self.wfile.flush() if chunked: self.wfile.write("0\r\n\r\n".encode("utf-8")) self.wfile.flush() print("Generating example data...") all_tickers = example_tickers(60) all_batches = example_batches(all_tickers) the_json_data = example_json_data(all_tickers) server_address = ("localhost", 8008) try: httpd = HTTPServer(server_address, MyRequestHandler) print(f"Serving on {server_address[0]}:{server_address[1]}...") httpd.serve_forever() except KeyboardInterrupt: print("Shutting down server") httpd.socket.close()