in src/table.jl [468:611]
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
t = Table()
sch = nothing
dictencodingslockable = Lockable(Dict{Int64,DictEncoding}())
dictencoded = Dict{Int64,Meta.Field}()
sync = OrderedSynchronizer()
tsks = Channel{Any}(Inf)
tsk = @wkspawn begin
i = 1
for cols in tsks
if i == 1
foreach(x -> push!(columns(t), x), cols)
elseif i == 2
foreach(1:length(cols)) do i
columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
end
else
foreach(1:length(cols)) do i
append!(columns(t)[i], cols[i])
end
end
i += 1
end
end
anyrecordbatches = false
rbi = 1
@sync for blob in blobs
for batch in BatchIterator(blob)
header = batch.msg.header
if header isa Meta.Schema
@debug "parsing schema message"
if sch === nothing
for (i, field) in enumerate(header.fields)
push!(names(t), Symbol(field.name))
getdictionaries!(dictencoded, field)
@debug "parsed column from schema: field = $field"
end
sch = header
schema(t)[] = sch
elseif sch != header
throw(
ArgumentError(
"mismatched schemas between different arrow batches: $sch != $header",
),
)
end
elseif header isa Meta.DictionaryBatch
id = header.id
recordbatch = header.data
@debug "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)"
@lock dictencodingslockable begin
dictencodings = dictencodingslockable[]
if haskey(dictencodings, id) && header.isDelta
field = dictencoded[id]
values, _, _, _ = build(
field,
field.type,
batch,
recordbatch,
dictencodingslockable,
Int64(1),
Int64(1),
Int64(1),
convert,
)
dictencoding = dictencodings[id]
if typeof(dictencoding.data) <: ChainedVector
append!(dictencoding.data, values)
else
A = ChainedVector([dictencoding.data, values])
S =
field.dictionary.indexType === nothing ? Int32 :
juliaeltype(field, field.dictionary.indexType, false)
dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
id,
A,
field.dictionary.isOrdered,
values.metadata,
)
end
continue
end
field = dictencoded[id]
values, _, _, _ = build(
field,
field.type,
batch,
recordbatch,
dictencodingslockable,
Int64(1),
Int64(1),
Int64(1),
convert,
)
A = values
S =
field.dictionary.indexType === nothing ? Int32 :
juliaeltype(field, field.dictionary.indexType, false)
dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
id,
A,
field.dictionary.isOrdered,
values.metadata,
)
end
@debug "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
anyrecordbatches = true
@debug "parsing record batch message: compression = $(header.compression)"
@wkspawn begin
cols =
collect(VectorIterator(sch, $batch, dictencodingslockable, convert))
put!(() -> put!(tsks, cols), sync, $(rbi))
end
rbi += 1
else
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
end
end
end
close(tsks)
wait(tsk)
lu = lookup(t)
ty = types(t)
if !anyrecordbatches && !isnothing(sch)
for field in sch.fields
T = juliaeltype(field, buildmetadata(field), convert)
push!(columns(t), T[])
end
end
for (nm, col) in zip(names(t), columns(t))
lu[nm] = col
push!(ty, eltype(col))
end
getfield(t, :metadata)[] = buildmetadata(sch)
return t
end