in src/write.jl [741:798]
function makerecordbatch(
b,
sch::Tables.Schema{names,types},
columns,
alignment,
) where {names,types}
nrows = Tables.rowcount(columns)
compress = nothing
fieldnodes = FieldNode[]
fieldbuffers = Buffer[]
bufferoffset = 0
for col in Tables.Columns(columns)
if col isa Compressed
compress = compressiontype(col)
end
bufferoffset =
makenodesbuffers!(col, fieldnodes, fieldbuffers, bufferoffset, alignment)
end
@debug "building record batch message: nrows = $nrows, sch = $sch, compress = $compress"
FN = length(fieldnodes)
Meta.recordBatchStartNodesVector(b, FN)
for fn in Iterators.reverse(fieldnodes)
Meta.createFieldNode(b, fn.length, fn.null_count)
end
nodes = FlatBuffers.endvector!(b, FN)
bodylen = 0
BN = length(fieldbuffers)
Meta.recordBatchStartBuffersVector(b, BN)
for buf in Iterators.reverse(fieldbuffers)
Meta.createBuffer(b, buf.offset, buf.length)
bodylen += padding(buf.length, alignment)
end
buffers = FlatBuffers.endvector!(b, BN)
if compress !== nothing
Meta.bodyCompressionStart(b)
Meta.bodyCompressionAddCodec(b, compress)
Meta.bodyCompressionAddMethod(b, Meta.BodyCompressionMethod.BUFFER)
compression = Meta.bodyCompressionEnd(b)
else
compression = FlatBuffers.UOffsetT(0)
end
@debug "built record batch message: nrows = $nrows, nodes = $fieldnodes, buffers = $fieldbuffers, compress = $compress, bodylen = $bodylen"
Meta.recordBatchStart(b)
Meta.recordBatchAddLength(b, Int64(nrows))
Meta.recordBatchAddNodes(b, nodes)
Meta.recordBatchAddBuffers(b, buffers)
Meta.recordBatchAddCompression(b, compression)
return Meta.recordBatchEnd(b), bodylen
end