function Base.iterate()

in src/table.jl [149:289]


function Base.iterate(x::Stream, (pos, id)=(1, 0))
    if Base.isdone(x)
        x.inputindex = 1
        x.batchiterator = nothing
        return nothing
    end
    if isnothing(x.batchiterator)
        blob = x.inputs[x.inputindex]
        x.batchiterator = BatchIterator(blob)
        pos = x.batchiterator.startpos
    end

    columns = AbstractVector[]
    compression = nothing

    while true
        state = iterate(x.batchiterator, (pos, id))
        
        while state === nothing
            x.inputindex += 1
            if Base.isdone(x)
                x.inputindex = 1
                x.batchiterator = nothing
                return nothing
            end
            blob = x.inputs[x.inputindex]
            x.batchiterator = BatchIterator(blob)
            pos = x.batchiterator.startpos
            state = iterate(x.batchiterator, (pos, id))
        end
        batch, (pos, id) = state
        header = batch.msg.header
        if isnothing(x.schema) && !isa(header, Meta.Schema)
            throw(ArgumentError("first arrow ipc message MUST be a schema message"))
        end
        if header isa Meta.Schema
            if isnothing(x.schema)
                x.schema = header
                
                
                for (i, field) in enumerate(x.schema.fields)
                    push!(x.names, Symbol(field.name))
                    push!(
                        x.types,
                        juliaeltype(field, buildmetadata(field.custom_metadata), x.convert),
                    )
                    
                    getdictionaries!(x.dictencoded, field)
                    @debug "parsed column from schema: field = $field"
                end
            elseif header != x.schema
                throw(
                    ArgumentError(
                        "mismatched schemas between different arrow batches: $(x.schema) != $header",
                    ),
                )
            end
        elseif header isa Meta.DictionaryBatch
            id = header.id
            recordbatch = header.data
            @debug "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)"
            if recordbatch.compression !== nothing
                compression = recordbatch.compression
            end
            @lock x.dictencodings begin
                dictencodings = x.dictencodings[]
                if haskey(dictencodings, id) && header.isDelta
                    
                    field = x.dictencoded[id]
                    values, _, _, _ = build(
                        field,
                        field.type,
                        batch,
                        recordbatch,
                        x.dictencodings,
                        Int64(1),
                        Int64(1),
                        Int64(1),
                        x.convert,
                    )
                    dictencoding = dictencodings[id]
                    append!(dictencoding.data, values)
                    continue
                end
                
                field = x.dictencoded[id]
                values, _, _, _ = build(
                    field,
                    field.type,
                    batch,
                    recordbatch,
                    x.dictencodings,
                    Int64(1),
                    Int64(1),
                    Int64(1),
                    x.convert,
                )
                A = ChainedVector([values])
                S =
                    field.dictionary.indexType === nothing ? Int32 :
                    juliaeltype(field, field.dictionary.indexType, false)
                dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
                    id,
                    A,
                    field.dictionary.isOrdered,
                    values.metadata,
                )
            end 
            @debug "parsed dictionary batch message: id=$id, data=$values\n"
        elseif header isa Meta.RecordBatch
            @debug "parsing record batch message: compression = $(header.compression)"
            if header.compression !== nothing
                compression = header.compression
            end
            for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert)
                push!(columns, vec)
            end
            break
        else
            throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
        end
    end

    if compression !== nothing
        if compression.codec == Flatbuf.CompressionType.ZSTD
            x.compression[] = :zstd
        elseif compression.codec == Flatbuf.CompressionType.LZ4_FRAME
            x.compression[] = :lz4
        else
            throw(ArgumentError("unsupported compression codec: $(compression.codec)"))
        end
    end

    lookup = Dict{Symbol,AbstractVector}()
    types = Type[]
    for (nm, col) in zip(x.names, columns)
        lookup[nm] = col
        push!(types, eltype(col))
    end
    return Table(x.names, types, columns, lookup, Ref(x.schema)), (pos, id)
end