http/get_simple/go/server/server.go (81 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "fmt" "log" "math/rand" "net/http" "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" "github.com/apache/arrow/go/v15/arrow/ipc" "github.com/apache/arrow/go/v15/arrow/memory" ) var schema = arrow.NewSchema([]arrow.Field{ {Name: "a", Type: arrow.PrimitiveTypes.Int64}, {Name: "b", Type: arrow.PrimitiveTypes.Int64}, {Name: "c", Type: arrow.PrimitiveTypes.Int64}, {Name: "d", Type: arrow.PrimitiveTypes.Int64}, }, nil) func GetPutData() []arrow.Record { const ( totalRecords = 100000000 length = 4096 ncolumns = 4 seed = 42 ) var ( r = rand.New(rand.NewSource(seed)) mem = memory.DefaultAllocator arrs = make([]arrow.Array, 0, ncolumns) ) for i := 0; i < ncolumns; i++ { buf := memory.NewResizableBuffer(mem) buf.Resize(length * 8) _, err := r.Read(buf.Buf()) if err != nil { panic(err) } defer buf.Release() data := array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, buf}, nil, 0, 0) defer data.Release() a := array.NewInt64Data(data) defer a.Release() arrs = append(arrs, a) } batch := array.NewRecord(schema, arrs, length) defer batch.Release() batches := make([]arrow.Record, 0) records := int64(0) for records < totalRecords { if records+length > totalRecords { lastLen := totalRecords - records batches = append(batches, batch.NewSlice(0, lastLen)) records += lastLen } else { batch.Retain() batches = append(batches, batch) records += length } } return batches } func main() { batches := GetPutData() http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusBadRequest) return } hdrs := w.Header() //// set this header to disable chunked transfer encoding: //hdrs.Add("Transfer-Encoding", "identity") //// set these headers if testing with a local browser-based client: //hdrs.Add("Access-Control-Allow-Origin", "http://localhost:8008") //hdrs.Add("Access-Control-Allow-Methods", "GET") //hdrs.Add("Access-Control-Allow-Headers", "content-type") hdrs.Add("Content-Type", "application/vnd.apache.arrow.stream") w.WriteHeader(http.StatusOK) wr := ipc.NewWriter(w, ipc.WithSchema(batches[0].Schema())) defer wr.Close() for _, b := range batches { if err := wr.Write(b); err != nil { panic(err) } } }) fmt.Println("Serving on localhost:8008...") log.Fatal(http.ListenAndServe(":8008", nil)) }