function append()

in src/append.jl [145:254]


function append(
    io::IO,
    source,
    arrow_schema,
    compress,
    largelists,
    denseunions,
    dictencode,
    dictencodenested,
    alignment,
    maxdepth,
    ntasks,
    meta,
    colmeta,
)
    seekend(io)
    skip(io, -8) 

    sch = Ref{Tables.Schema}(arrow_schema)
    sync = OrderedSynchronizer()
    msgs = Channel{Message}(ntasks)
    dictencodings = Dict{Int64,Any}() 
    
    blocks = (Block[], Block[])
    
    threaded = ntasks > 1
    tsk =
        threaded ? (@wkspawn for msg in msgs
            Base.write(io, msg, blocks, sch, alignment)
        end) : (@async for msg in msgs
            Base.write(io, msg, blocks, sch, alignment)
        end)
    anyerror = Threads.Atomic{Bool}(false)
    errorref = Ref{Any}()
    @sync for (i, tbl) in enumerate(Tables.partitions(source))
        if anyerror[]
            @error "error writing arrow data on partition = $(errorref[][3])" exception =
                (errorref[][1], errorref[][2])
            error("fatal error writing arrow data")
        end
        @debug "processing table partition i = $i"
        tbl_cols = Tables.columns(tbl)
        tbl_schema = Tables.schema(tbl_cols)

        if !is_equivalent_schema(arrow_schema, tbl_schema)
            throw(ArgumentError("Table schema does not match existing arrow file schema"))
        end

        if threaded
            @wkspawn process_partition(
                tbl_cols,
                dictencodings,
                largelists,
                compress,
                denseunions,
                dictencode,
                dictencodenested,
                maxdepth,
                sync,
                msgs,
                alignment,
                i,
                sch,
                errorref,
                anyerror,
                meta,
                colmeta,
            )
        else
            @async process_partition(
                tbl_cols,
                dictencodings,
                largelists,
                compress,
                denseunions,
                dictencode,
                dictencodenested,
                maxdepth,
                sync,
                msgs,
                alignment,
                i,
                sch,
                errorref,
                anyerror,
                meta,
                colmeta,
            )
        end
    end
    if anyerror[]
        @error "error writing arrow data on partition = $(errorref[][3])" exception =
            (errorref[][1], errorref[][2])
        error("fatal error writing arrow data")
    end
    
    close(msgs)
    
    wait(tsk)

    Base.write(
        io,
        Message(UInt8[], nothing, 0, true, false, Meta.Schema),
        blocks,
        sch,
        alignment,
    )

    return io
end