in src/table.jl [149:289]
function Base.iterate(x::Stream, (pos, id)=(1, 0))
if Base.isdone(x)
x.inputindex = 1
x.batchiterator = nothing
return nothing
end
if isnothing(x.batchiterator)
blob = x.inputs[x.inputindex]
x.batchiterator = BatchIterator(blob)
pos = x.batchiterator.startpos
end
columns = AbstractVector[]
compression = nothing
while true
state = iterate(x.batchiterator, (pos, id))
while state === nothing
x.inputindex += 1
if Base.isdone(x)
x.inputindex = 1
x.batchiterator = nothing
return nothing
end
blob = x.inputs[x.inputindex]
x.batchiterator = BatchIterator(blob)
pos = x.batchiterator.startpos
state = iterate(x.batchiterator, (pos, id))
end
batch, (pos, id) = state
header = batch.msg.header
if isnothing(x.schema) && !isa(header, Meta.Schema)
throw(ArgumentError("first arrow ipc message MUST be a schema message"))
end
if header isa Meta.Schema
if isnothing(x.schema)
x.schema = header
for (i, field) in enumerate(x.schema.fields)
push!(x.names, Symbol(field.name))
push!(
x.types,
juliaeltype(field, buildmetadata(field.custom_metadata), x.convert),
)
getdictionaries!(x.dictencoded, field)
@debug "parsed column from schema: field = $field"
end
elseif header != x.schema
throw(
ArgumentError(
"mismatched schemas between different arrow batches: $(x.schema) != $header",
),
)
end
elseif header isa Meta.DictionaryBatch
id = header.id
recordbatch = header.data
@debug "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)"
if recordbatch.compression !== nothing
compression = recordbatch.compression
end
@lock x.dictencodings begin
dictencodings = x.dictencodings[]
if haskey(dictencodings, id) && header.isDelta
field = x.dictencoded[id]
values, _, _, _ = build(
field,
field.type,
batch,
recordbatch,
x.dictencodings,
Int64(1),
Int64(1),
Int64(1),
x.convert,
)
dictencoding = dictencodings[id]
append!(dictencoding.data, values)
continue
end
field = x.dictencoded[id]
values, _, _, _ = build(
field,
field.type,
batch,
recordbatch,
x.dictencodings,
Int64(1),
Int64(1),
Int64(1),
x.convert,
)
A = ChainedVector([values])
S =
field.dictionary.indexType === nothing ? Int32 :
juliaeltype(field, field.dictionary.indexType, false)
dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
id,
A,
field.dictionary.isOrdered,
values.metadata,
)
end
@debug "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug "parsing record batch message: compression = $(header.compression)"
if header.compression !== nothing
compression = header.compression
end
for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert)
push!(columns, vec)
end
break
else
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end
if compression !== nothing
if compression.codec == Flatbuf.CompressionType.ZSTD
x.compression[] = :zstd
elseif compression.codec == Flatbuf.CompressionType.LZ4_FRAME
x.compression[] = :lz4
else
throw(ArgumentError("unsupported compression codec: $(compression.codec)"))
end
end
lookup = Dict{Symbol,AbstractVector}()
types = Type[]
for (nm, col) in zip(x.names, columns)
lookup[nm] = col
push!(types, eltype(col))
end
return Table(x.names, types, columns, lookup, Ref(x.schema)), (pos, id)
end