in table/arrow_utils.go [1257:1314]
func recordsToDataFiles(ctx context.Context, rootLocation string, meta *MetadataBuilder, args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
if args.counter == nil {
args.counter = internal.Counter(0)
}
defer func() {
if r := recover(); r != nil {
var err error
switch e := r.(type) {
case string:
err = fmt.Errorf("error encountered during file writing %s", e)
case error:
err = fmt.Errorf("error encountered during file writing: %w", e)
}
ret = func(yield func(iceberg.DataFile, error) bool) {
yield(nil, err)
}
}
}()
if args.writeUUID == nil {
u := uuid.Must(uuid.NewRandom())
args.writeUUID = &u
}
targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
WriteTargetFileSizeBytesDefault))
nameMapping := meta.CurrentSchema().NameMapping()
taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
if err != nil {
panic(err)
}
nextCount, stopCount := iter.Pull(args.counter)
if meta.CurrentSpec().IsUnpartitioned() {
tasks := func(yield func(WriteTask) bool) {
defer stopCount()
for batch := range binPackRecords(args.itr, 20, targetFileSize) {
cnt, _ := nextCount()
t := WriteTask{
Uuid: *args.writeUUID,
ID: cnt,
Schema: taskSchema,
Batches: batch,
}
if !yield(t) {
return
}
}
}
return writeFiles(ctx, rootLocation, args.fs, meta, tasks)
}
panic(fmt.Errorf("%w: write stream with partitions", iceberg.ErrNotImplemented))
}