in src/write.jl [138:205]
function Base.open(
::Type{Writer},
io::T,
compress::Union{Nothing,Symbol,LZ4FrameCompressor,ZstdCompressor},
writetofile::Bool,
largelists::Bool,
denseunions::Bool,
dictencode::Bool,
dictencodenested::Bool,
alignment::Integer,
maxdepth::Integer,
ntasks::Integer,
meta::Union{Nothing,Any},
colmeta::Union{Nothing,Any},
closeio::Bool,
) where {T<:IO}
if compress isa Symbol && compress !== :lz4 && compress !== :zstd
throw(
ArgumentError(
"unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`",
),
)
end
sync = OrderedSynchronizer(2)
msgs = Channel{Message}(ntasks)
schema = Ref{Tables.Schema}()
firstcols = Ref{Any}()
dictencodings = Dict{Int64,Any}()
blocks = (Block[], Block[])
threaded = Threads.nthreads() > 1
task =
threaded ? (@wkspawn for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
end)
anyerror = Threads.Atomic{Bool}(false)
errorref = Ref{Any}()
meta = _normalizemeta(meta)
colmeta = _normalizecolmeta(colmeta)
return Writer{T}(
io,
closeio,
compress,
writetofile,
largelists,
denseunions,
dictencode,
dictencodenested,
threaded,
alignment,
maxdepth,
meta,
colmeta,
sync,
msgs,
schema,
firstcols,
dictencodings,
blocks,
task,
anyerror,
errorref,
1,
false,
)
end