http/get_compressed/python/server/server.py (320 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from random import choice, randint from http.server import BaseHTTPRequestHandler, HTTPServer import io import pyarrow as pa import pyarrow.compute as pc import re import socketserver import string # use dictionary encoding for the ticker column USE_DICTIONARY_ENCODING = True def random_string(alphabet, length): return "".join(choice(alphabet) for _ in range(length)) def random_name(initial): length = randint(3, 7) return initial + random_string(string.ascii_lowercase, length) def example_tickers(num_tickers): tickers = [] while len(tickers) < num_tickers: length = randint(3, 4) random_ticker = random_string(string.ascii_uppercase, length) if random_ticker not in tickers: tickers.append(random_ticker) return tickers the_ticker_type = ( pa.dictionary(pa.int32(), pa.utf8()) if USE_DICTIONARY_ENCODING else pa.utf8() ) the_schema = pa.schema( [ ("ticker", the_ticker_type), ("price", pa.int64()), ("volume", pa.int64()), ] ) def example_batch(tickers, length): ticker_indices = [] price = [] volume = [] for _ in range(length): ticker_indices.append(randint(0, len(tickers) - 1)) price.append(randint(1, 1000) * 100) volume.append(randint(1, 10000)) ticker = ( pa.DictionaryArray.from_arrays(ticker_indices, tickers) if USE_DICTIONARY_ENCODING else pc.take(tickers, ticker_indices, boundscheck=False) ) return pa.RecordBatch.from_arrays([ticker, price, volume], schema=the_schema) def example_batches(tickers): # these parameters are chosen to generate a response # of ~1 GB and chunks of ~140 KB (uncompressed) total_records = 42_000_000 batch_len = 6 * 1024 # all the batches sent are random slices of the larger base batch base_batch = example_batch(tickers, length=8 * batch_len) batches = [] records = 0 while records < total_records: length = min(batch_len, total_records - records) offset = randint(0, base_batch.num_rows - length - 1) batch = base_batch.slice(offset, length) batches.append(batch) records += length return batches # end of example data generation # what the HTTP spec calls a token (any character except CTLs or separators) TOKEN_RE = r"(?:[A-Za-z0-9!#$%&'*+./^_`|~-]+)" # [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616) LWS_RE = r"(?:[ \t]|\r\n[ \t]+)*" TOKENIZER_PAT = re.compile( f"(?P<TOK>{TOKEN_RE})" r'|(?P<QUOTED>"([^"\\]|\\.)*")' # a quoted string (escaped pairs allowed) r"|(?P<COMMA>,)" r"|(?P<SEMI>;)" r"|(?P<EQ>=)" f"|(?P<SKIP>{LWS_RE})" # LWS is skipped r"|(?P<MISMATCH>.+)", flags=re.ASCII, # HTTP headers are encoded in ASCII ) def parse_header_value(header_name, header_value): """ Parse the Accept or Accept-Encoding request header values. Returns ------- list of (str, dict) The list of lowercase tokens and their parameters in the order they appear in the header. The parameters are stored in a dictionary where the keys are the parameter names and the values are the parameter values. If a parameter is not followed by an equal sign and a value, the value is None. """ def unexpected(label, value): msg = f"Malformed {header_name} header: unexpected {label} at {value!r}" return ValueError(msg) def tokenize(): for mo in re.finditer(TOKENIZER_PAT, header_value): kind = mo.lastgroup if kind == "SKIP": continue elif kind == "MISMATCH": raise unexpected("character", mo.group()) yield (kind, mo.group()) tokens = tokenize() def expect(expected_kind): kind, text = next(tokens) if kind != expected_kind: raise unexpected("token", text) return text accepted = [] while True: try: name, params = None, {} name = expect("TOK").lower() kind, text = next(tokens) while True: if kind == "COMMA": accepted.append((name, params)) break if kind == "SEMI": ident = expect("TOK") params[ident] = None # init param to None kind, text = next(tokens) if kind != "EQ": continue kind, text = next(tokens) if kind in ["TOK", "QUOTED"]: if kind == "QUOTED": text = text[1:-1] # remove the quotes params[ident] = text # set param to value kind, text = next(tokens) continue raise unexpected("token", text) except StopIteration: break if name is not None: # any unfinished ;param=value sequence or trailing separators are ignored accepted.append((name, params)) return accepted ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream" def pick_ipc_codec(accept_header, available, default): """ Pick the IPC stream codec according to the Accept header. This is used when deciding which codec to use for compression of IPC buffer streams. This is a feature of the Arrow IPC stream format and is different from the HTTP content-coding used to compress the entire HTTP response. This is how a client may specify the IPC buffer compression codecs it accepts: Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4" Parameters ---------- accept_header : str|None The value of the Accept header from an HTTP request. available : list of str The codecs that the server can provide in the order preferred by the server. Example: ["zstd", "lz4"]. default : str|None The codec to use if the client does not specify the ";codecs" parameter in the Accept header. Returns ------- str|None The codec that the server should use to compress the IPC buffer stream. None if the client does not accept any of the available codecs explicitly listed. ;codecs="" means no codecs are accepted. If the client does not specify the codecs parameter, the default codec is returned. """ did_specify_codecs = False accepted_codecs = [] if accept_header is not None: accepted = parse_header_value("Accept", accept_header) for media_range, params in accepted: if ( media_range == "*/*" or media_range == "application/*" or media_range == ARROW_STREAM_FORMAT ): did_specify_codecs = "codecs" in params codecs_str = params.get("codecs") if codecs_str is None: continue for codec in codecs_str.split(","): accepted_codecs.append(codec.strip()) for codec in available: if codec in accepted_codecs: return codec return None if did_specify_codecs else default def pick_coding(accept_encoding_header, available): """ Pick the content-coding according to the Accept-Encoding header. This is used when using HTTP response compression instead of IPC buffer compression. Parameters ---------- accept_encoding_header : str The value of the Accept-Encoding header from an HTTP request. available : list of str The content-codings that the server can provide in the order preferred by the server. Example: ["zstd", "br", "gzip"]. Returns ------- str The content-coding that the server should use to compress the response. "identity" is returned if no acceptable content-coding is found in the list of available codings. None if the client does not accept any of the available content-codings and doesn't accept "identity" (uncompressed) either. In this case, a "406 Not Acceptable" response should be sent. """ accepted = parse_header_value("Accept-Encoding", accept_encoding_header) def qvalue_or(params, default): qvalue = params.get("q") if qvalue is not None: try: return float(qvalue) except ValueError: raise ValueError(f"Invalid qvalue in Accept-Encoding header: {qvalue}") return default if "identity" not in available: available = available + ["identity"] state = {} for coding, params in accepted: qvalue = qvalue_or(params, 0.0001 if coding == "identity" else 1.0) if coding == "*": for coding in available: if coding not in state: state[coding] = qvalue elif coding in available: state[coding] = qvalue # "identity" is always acceptable unless explicitly refused (;q=0) if "identity" not in state: state["identity"] = 0.0001 # all the candidate codings are now in the state dictionary and we # have to consider only the ones that have the maximum qvalue max_qvalue = max(state.values()) if max_qvalue == 0.0: return None for coding in available: if coding in state and state[coding] == max_qvalue: return coding return None def pick_compression(headers, available_ipc_codecs, available_codings, default): """ Pick the compression strategy based on the Accept and Accept-Encoding headers. Parameters ---------- headers : dict The HTTP request headers. available_ipc_codecs : list of str The codecs that the server can provide for IPC buffer compression. available_codings : list of str The content-codings that the server can provide for HTTP response compression. default : str The default compression strategy to use if the client does explicitly choose. Returns ------- str|None The compression strategy to use. It can be one of the following: "identity": no compression at all. "identity+zstd": No HTTP compression + IPC buffer compression with Zstd. "identity+lz4": No HTTP compression + IPC buffer compression with LZ4. "zstd", "br", "gzip", ...: HTTP compression without IPC buffer compression. None means a "406 Not Acceptable" response should be sent. """ accept = headers.get("Accept") ipc_codec = pick_ipc_codec(accept, available_ipc_codecs, default=None) if ipc_codec is None: accept_encoding = headers.get("Accept-Encoding") return ( default if accept_encoding is None else pick_coding(accept_encoding, available_codings) ) return "identity+" + ipc_codec class LateClosingBytesIO(io.BytesIO): """ BytesIO that does not close on close(). When a stream wrapping a a file-like object is closed, the underlying file-like object is also closed. This subclass prevents that from happening by overriding the close method. If we close a RecordBatchStreamWriter wrapping a BytesIO object, we want to be able to create a memory view of the buffer. But that is only possible if the BytesIO object is not closed yet. """ def close(self): pass def close_now(self): super().close() class SocketWriterSink(socketserver._SocketWriter): """Wrapper to make wfile usable as a sink for Arrow stream writing.""" def __init__(self, wfile): self.wfile = wfile def writable(self): return True def write(self, b): self.wfile.write(b) def fileno(self): return self._sock.fileno() def close(self): """Do nothing so Arrow stream wrappers don't close the socket.""" pass def generate_chunk_buffers(schema, source, compression): # the sink holds the buffer and we give a view of it to the caller with LateClosingBytesIO() as sink: # keep buffering until we have at least MIN_BUFFER_SIZE bytes # in the buffer before yielding it to the caller. Setting it # to 1 means we yield as soon as the compression blocks are # formed and reach the sink buffer. MIN_BUFFER_SIZE = 64 * 1024 if compression.startswith("identity"): if compression == "identity+zstd": options = pa.ipc.IpcWriteOptions(compression="zstd") elif compression == "identity+lz4": options = pa.ipc.IpcWriteOptions(compression="lz4") else: options = None # source: RecordBatchReader # |> writer: RecordBatchStreamWriter # |> sink: LateClosingBytesIO writer = pa.ipc.new_stream(sink, schema, options=options) for batch in source: writer.write_batch(batch) if sink.tell() >= MIN_BUFFER_SIZE: sink.truncate() with sink.getbuffer() as buffer: yield buffer sink.seek(0) writer.close() # write EOS marker and flush else: compression = "brotli" if compression == "br" else compression with pa.CompressedOutputStream(sink, compression) as compressor: # has the first buffer been yielded already? sent_first = False # source: RecordBatchReader # |> writer: RecordBatchStreamWriter # |> compressor: CompressedOutputStream # |> sink: LateClosingBytesIO writer = pa.ipc.new_stream(compressor, schema) for batch in source: writer.write_batch(batch) # we try to yield a buffer ASAP no matter how small if not sent_first and sink.tell() == 0: compressor.flush() pos = sink.tell() if pos >= MIN_BUFFER_SIZE or (not sent_first and pos >= 1): sink.truncate() with sink.getbuffer() as buffer: yield buffer sink.seek(0) sent_first = True writer.close() # write EOS marker and flush compressor.close() sink.truncate() with sink.getbuffer() as buffer: yield buffer sink.close_now() AVAILABLE_IPC_CODECS = ["zstd", "lz4"] """List of available codecs Arrow IPC buffer compression.""" AVAILABLE_CODINGS = ["zstd", "br", "gzip"] """ List of available content-codings as used in HTTP. Note that Arrow stream classes refer to Brotli as "brotli" and not "br". """ class MyRequestHandler(BaseHTTPRequestHandler): """ Response handler for a simple HTTP server. This HTTP request handler serves a compressed HTTP response with an Arrow stream in it or a (TODO) compressed Arrow stream in a uncompressed HTTP response. The Arrow data is randomly generated "trading data" with a schema consisting of a ticker, price (in cents), and volume. """ def _resolve_batches(self): return pa.RecordBatchReader.from_batches(the_schema, all_batches) def _send_not_acceptable(self, parsing_error=None): self.send_response(406, "Not Acceptable") self.send_header("Content-Type", "text/plain") self.end_headers() if parsing_error: message = f"Error parsing header: {parsing_error}\n" else: message = "None of the available codings are accepted by this client.\n" accept = self.headers.get("Accept") if accept is not None: message += f"`Accept` header was {accept!r}.\n" accept_encoding = self.headers.get("Accept-Encoding") if accept_encoding is not None: message += f"`Accept-Encoding` header was {accept_encoding!r}.\n" self.wfile.write(bytes(message, "utf-8")) def do_GET(self): # HTTP/1.0 requests don't get chunked responses if self.request_version == "HTTP/1.0": self.protocol_version = "HTTP/1.0" chunked = False else: self.protocol_version = "HTTP/1.1" chunked = True # if client's intent cannot be derived from the headers, return # uncompressed data for HTTP/1.0 requests and compressed data for # HTTP/1.1 requests with the safest compression format choice: "gzip". default_compression = ( "identity" if self.request_version == "HTTP/1.0" or ("gzip" not in AVAILABLE_CODINGS) else "gzip" ) try: compression = pick_compression( self.headers, AVAILABLE_IPC_CODECS, AVAILABLE_CODINGS, default_compression, ) if compression is None: self._send_not_acceptable() return except ValueError as e: self._send_not_acceptable(str(e)) return ### in a real application the data would be resolved from a database or ### another source like a file and error handling would be done here ### before the 200 OK response starts being sent to the client. source = self._resolve_batches() self.send_response(200) ### set these headers if testing with a local browser-based client: # self.send_header('Access-Control-Allow-Origin', 'http://localhost:8008') # self.send_header('Access-Control-Allow-Methods', 'GET') # self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.send_header( "Content-Type", ( f"{ARROW_STREAM_FORMAT}; codecs={compression[9:]}" if compression.startswith("identity+") else ARROW_STREAM_FORMAT ), ) # suggest a default filename in case this response is saved by the user self.send_header("Content-Disposition", r'attachment; filename="output.arrows"') if not compression.startswith("identity"): self.send_header("Content-Encoding", compression) if chunked: self.send_header("Transfer-Encoding", "chunked") self.end_headers() for buffer in generate_chunk_buffers(the_schema, source, compression): self.wfile.write(f"{len(buffer):X}\r\n".encode("utf-8")) self.wfile.write(buffer) self.wfile.write("\r\n".encode("utf-8")) self.wfile.write("0\r\n\r\n".encode("utf-8")) else: self.end_headers() sink = SocketWriterSink(self.wfile) for buffer in generate_chunk_buffers(the_schema, source, compression): sink.write(buffer) print("Generating example data...") all_tickers = example_tickers(60) all_batches = example_batches(all_tickers) server_address = ("localhost", 8008) try: httpd = HTTPServer(server_address, MyRequestHandler) print(f"Serving on {server_address[0]}:{server_address[1]}...") httpd.serve_forever() except KeyboardInterrupt: print("Shutting down server") httpd.socket.close()