in src/write.jl [253:344]
function write(writer::Writer, source)
@sync for tbl in Tables.partitions(source)
check_errors(writer)
@debug "processing table partition $(writer.partition_count)"
tblcols = Tables.columns(tbl)
if !isassigned(writer.firstcols)
if writer.writetofile
@debug "starting write of arrow formatted file"
Base.write(writer.io, FILE_FORMAT_MAGIC_BYTES, b"\0\0")
end
meta = isnothing(writer.meta) ? getmetadata(source) : writer.meta
cols = toarrowtable(
tblcols,
writer.dictencodings,
writer.largelists,
writer.compress,
writer.denseunions,
writer.dictencode,
writer.dictencodenested,
writer.maxdepth,
meta,
writer.colmeta,
)
writer.schema[] = Tables.schema(cols)
writer.firstcols[] = cols
put!(writer.msgs, makeschemamsg(writer.schema[], cols))
if !isempty(writer.dictencodings)
des = sort!(collect(writer.dictencodings); by=x -> x.first, rev=true)
for (id, delock) in des
de = delock.value
dictsch = Tables.Schema((:col,), (eltype(de.data),))
dictbatchmsg = makedictionarybatchmsg(
dictsch,
(col=de.data,),
id,
false,
writer.alignment,
)
put!(writer.msgs, dictbatchmsg)
end
end
recbatchmsg = makerecordbatchmsg(writer.schema[], cols, writer.alignment)
put!(writer.msgs, recbatchmsg)
else
if writer.threaded
@wkspawn process_partition(
tblcols,
writer.dictencodings,
writer.largelists,
writer.compress,
writer.denseunions,
writer.dictencode,
writer.dictencodenested,
writer.maxdepth,
writer.sync,
writer.msgs,
writer.alignment,
$(writer.partition_count),
writer.schema,
writer.errorref,
writer.anyerror,
writer.meta,
writer.colmeta,
)
else
@async process_partition(
tblcols,
writer.dictencodings,
writer.largelists,
writer.compress,
writer.denseunions,
writer.dictencode,
writer.dictencodenested,
writer.maxdepth,
writer.sync,
writer.msgs,
writer.alignment,
$(writer.partition_count),
writer.schema,
writer.errorref,
writer.anyerror,
writer.meta,
writer.colmeta,
)
end
end
writer.partition_count += 1
end
check_errors(writer)
return
end