in src/append.jl [145:254]
function append(
io::IO,
source,
arrow_schema,
compress,
largelists,
denseunions,
dictencode,
dictencodenested,
alignment,
maxdepth,
ntasks,
meta,
colmeta,
)
seekend(io)
skip(io, -8)
sch = Ref{Tables.Schema}(arrow_schema)
sync = OrderedSynchronizer()
msgs = Channel{Message}(ntasks)
dictencodings = Dict{Int64,Any}()
blocks = (Block[], Block[])
threaded = ntasks > 1
tsk =
threaded ? (@wkspawn for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end)
anyerror = Threads.Atomic{Bool}(false)
errorref = Ref{Any}()
@sync for (i, tbl) in enumerate(Tables.partitions(source))
if anyerror[]
@error "error writing arrow data on partition = $(errorref[][3])" exception =
(errorref[][1], errorref[][2])
error("fatal error writing arrow data")
end
@debug "processing table partition i = $i"
tbl_cols = Tables.columns(tbl)
tbl_schema = Tables.schema(tbl_cols)
if !is_equivalent_schema(arrow_schema, tbl_schema)
throw(ArgumentError("Table schema does not match existing arrow file schema"))
end
if threaded
@wkspawn process_partition(
tbl_cols,
dictencodings,
largelists,
compress,
denseunions,
dictencode,
dictencodenested,
maxdepth,
sync,
msgs,
alignment,
i,
sch,
errorref,
anyerror,
meta,
colmeta,
)
else
@async process_partition(
tbl_cols,
dictencodings,
largelists,
compress,
denseunions,
dictencode,
dictencodenested,
maxdepth,
sync,
msgs,
alignment,
i,
sch,
errorref,
anyerror,
meta,
colmeta,
)
end
end
if anyerror[]
@error "error writing arrow data on partition = $(errorref[][3])" exception =
(errorref[][1], errorref[][2])
error("fatal error writing arrow data")
end
close(msgs)
wait(tsk)
Base.write(
io,
Message(UInt8[], nothing, 0, true, false, Meta.Schema),
blocks,
sch,
alignment,
)
return io
end