arrow/array/json_reader.go (134 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package array
import (
"errors"
"fmt"
"io"
"sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/json"
)
type (
Option func(config)
config interface{}
)
// WithChunk sets the chunk size for reading in json records. The default is to
// read in one row per record batch as a single object. If chunk size is set to
// a negative value, then the entire file is read as a single record batch.
// Otherwise a record batch is read in with chunk size rows per record batch until
// it reaches EOF.
func WithChunk(n int) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *JSONReader:
cfg.chunk = n
default:
panic(fmt.Errorf("arrow/json): unknown config type %T", cfg))
}
}
}
// WithAllocator specifies the allocator to use for creating the record batches,
// if it is not called, then memory.DefaultAllocator will be used.
func WithAllocator(mem memory.Allocator) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *JSONReader:
cfg.mem = mem
default:
panic(fmt.Errorf("arrow/json): unknown config type %T", cfg))
}
}
}
// JSONReader is a json reader that meets the RecordReader interface definition.
//
// To read in an array of objects as a record, you can use RecordFromJSON
// which is equivalent to reading the json as a struct array whose fields are
// the columns of the record. This primarily exists to fit the RecordReader
// interface as a matching reader for the csv reader.
type JSONReader struct {
r *json.Decoder
schema *arrow.Schema
bldr *RecordBuilder
refs atomic.Int64
cur arrow.Record
err error
chunk int
done bool
mem memory.Allocator
next func() bool
}
// NewJSONReader returns a json RecordReader which expects to find one json object
// per row of dataset. Using WithChunk can control how many rows are processed
// per record, which is how many objects become a single record from the file.
//
// If it is desired to write out an array of rows, then simply use RecordToStructArray
// and json.Marshal the struct array for the same effect.
func NewJSONReader(r io.Reader, schema *arrow.Schema, opts ...Option) *JSONReader {
rr := &JSONReader{
r: json.NewDecoder(r),
schema: schema,
chunk: 1,
}
rr.refs.Add(1)
for _, o := range opts {
o(rr)
}
if rr.mem == nil {
rr.mem = memory.DefaultAllocator
}
rr.bldr = NewRecordBuilder(rr.mem, schema)
switch {
case rr.chunk < 0:
rr.next = rr.nextall
case rr.chunk > 1:
rr.next = rr.nextn
default:
rr.next = rr.next1
}
return rr
}
// Err returns the last encountered error
func (r *JSONReader) Err() error { return r.err }
func (r *JSONReader) Schema() *arrow.Schema { return r.schema }
// Record returns the last read in record. The returned record is only valid
// until the next call to Next unless Retain is called on the record itself.
func (r *JSONReader) Record() arrow.Record { return r.cur }
func (r *JSONReader) Retain() {
r.refs.Add(1)
}
func (r *JSONReader) Release() {
debug.Assert(r.refs.Load() > 0, "too many releases")
if r.refs.Add(-1) == 0 {
if r.cur != nil {
r.cur.Release()
r.bldr.Release()
r.r = nil
}
}
}
// Next returns true if it read in a record, which will be available via Record
// and false if there is either an error or the end of the reader.
func (r *JSONReader) Next() bool {
if r.cur != nil {
r.cur.Release()
r.cur = nil
}
if r.err != nil || r.done {
return false
}
return r.next()
}
func (r *JSONReader) readNext() bool {
r.err = r.r.Decode(r.bldr)
if r.err != nil {
r.done = true
if errors.Is(r.err, io.EOF) {
r.err = nil
}
return false
}
return true
}
func (r *JSONReader) nextall() bool {
for r.readNext() {
}
r.cur = r.bldr.NewRecord()
return r.cur.NumRows() > 0
}
func (r *JSONReader) next1() bool {
if !r.readNext() {
return false
}
r.cur = r.bldr.NewRecord()
return true
}
func (r *JSONReader) nextn() bool {
n := 0
for i := 0; i < r.chunk && !r.done; i, n = i+1, n+1 {
if !r.readNext() {
break
}
}
if n > 0 {
r.cur = r.bldr.NewRecord()
}
return n > 0
}
var _ RecordReader = (*JSONReader)(nil)