function Base.open()

in src/write.jl [138:205]


function Base.open(
    ::Type{Writer},
    io::T,
    compress::Union{Nothing,Symbol,LZ4FrameCompressor,ZstdCompressor},
    writetofile::Bool,
    largelists::Bool,
    denseunions::Bool,
    dictencode::Bool,
    dictencodenested::Bool,
    alignment::Integer,
    maxdepth::Integer,
    ntasks::Integer,
    meta::Union{Nothing,Any},
    colmeta::Union{Nothing,Any},
    closeio::Bool,
) where {T<:IO}
    if compress isa Symbol && compress !== :lz4 && compress !== :zstd
        throw(
            ArgumentError(
                "unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`",
            ),
        )
    end
    sync = OrderedSynchronizer(2)
    msgs = Channel{Message}(ntasks)
    schema = Ref{Tables.Schema}()
    firstcols = Ref{Any}()
    dictencodings = Dict{Int64,Any}() 
    blocks = (Block[], Block[])
    
    threaded = Threads.nthreads() > 1
    task =
        threaded ? (@wkspawn for msg in msgs
            Base.write(io, msg, blocks, schema, alignment)
        end) : (@async for msg in msgs
            Base.write(io, msg, blocks, schema, alignment)
        end)
    anyerror = Threads.Atomic{Bool}(false)
    errorref = Ref{Any}()
    meta = _normalizemeta(meta)
    colmeta = _normalizecolmeta(colmeta)
    return Writer{T}(
        io,
        closeio,
        compress,
        writetofile,
        largelists,
        denseunions,
        dictencode,
        dictencodenested,
        threaded,
        alignment,
        maxdepth,
        meta,
        colmeta,
        sync,
        msgs,
        schema,
        firstcols,
        dictencodings,
        blocks,
        task,
        anyerror,
        errorref,
        1,
        false,
    )
end