http/get_simple/python/server/http.server/server.py (80 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import pyarrow as pa from random import randbytes from http.server import BaseHTTPRequestHandler, HTTPServer import io # use chunked transfer encoding? chunked_encoding = True schema = pa.schema([ ('a', pa.int64()), ('b', pa.int64()), ('c', pa.int64()), ('d', pa.int64()) ]) def GetPutData(): total_records = 100000000 length = 4096 ncolumns = 4 arrays = [] for x in range(0, ncolumns): buffer = pa.py_buffer(randbytes(length * 8)) arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0)) batch = pa.record_batch(arrays, schema) batches = [] records = 0 while records < total_records: if records + length > total_records: last_length = total_records - records batches.append(batch.slice(0, last_length)) records += last_length else: batches.append(batch) records += length return batches def generate_buffers(schema, source): with io.BytesIO() as sink, pa.ipc.new_stream(sink, schema) as writer: for batch in source: sink.seek(0) writer.write_batch(batch) sink.truncate() with sink.getbuffer() as buffer: yield buffer sink.seek(0) writer.close() sink.truncate() with sink.getbuffer() as buffer: yield buffer # def chunk_huge_buffer(view, max_chunk_size): # if len(view) <= max_chunk_size: # yield view # return # num_splits = len(view) // max_chunk_size # for i in range(num_splits): # with view[i * max_chunk_size:i * max_chunk_size + max_chunk_size] as chunk: # yield chunk # last_chunk_size = len(view) - (num_splits * max_chunk_size) # if last_chunk_size > 0: # with view[num_splits * max_chunk_size:] as chunk: # yield chunk # def generate_chunked_buffers(schema, source, max_chunk_size): # for buffer in generate_buffers(schema, source): # with memoryview(buffer) as view: # for chunk in chunk_huge_buffer(view, max_chunk_size): # yield chunk class MyServer(BaseHTTPRequestHandler): def resolve_batches(self): return pa.RecordBatchReader.from_batches(schema, batches) def do_GET(self): ### given a source of record batches, this function sends them ### to a client using HTTP chunked transfer encoding. source = self.resolve_batches() if self.request_version == 'HTTP/1.0': self.protocol_version = 'HTTP/1.0' chunked = False else: self.protocol_version = 'HTTP/1.1' chunked = chunked_encoding self.close_connection = True self.send_response(200) self.send_header('Content-Type', 'application/vnd.apache.arrow.stream') ### set these headers if testing with a local browser-based client: #self.send_header('Access-Control-Allow-Origin', 'http://localhost:8008') #self.send_header('Access-Control-Allow-Methods', 'GET') #self.send_header('Access-Control-Allow-Headers', 'Content-Type') ### set this header to make browsers download the file with a name and extension: #self.send_header('Content-Disposition', 'attachment; filename="data.arrows"') if chunked: self.send_header('Transfer-Encoding', 'chunked') self.end_headers() ### if any record batch could be larger than 2 GB, Python's ### http.server will error when calling self.wfile.write(), ### so you will need to split them into smaller chunks by ### using the generate_chunked_buffers() function instead ### if generate_buffers(). # for buffer in generate_chunked_buffers(schema, source, int(2e9)): for buffer in generate_buffers(schema, source): if chunked: self.wfile.write('{:X}\r\n'.format(len(buffer)).encode('utf-8')) self.wfile.write(buffer) if chunked: self.wfile.write('\r\n'.encode('utf-8')) self.wfile.flush() if chunked: self.wfile.write('0\r\n\r\n'.encode('utf-8')) self.wfile.flush() batches = GetPutData() server_address = ('localhost', 8008) try: httpd = HTTPServer(server_address, MyServer) print(f'Serving on {server_address[0]}:{server_address[1]}...') httpd.serve_forever() except KeyboardInterrupt: print('Shutting down server') httpd.socket.close()