func recordsToDataFiles()

in table/arrow_utils.go [1257:1314]


func recordsToDataFiles(ctx context.Context, rootLocation string, meta *MetadataBuilder, args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
	if args.counter == nil {
		args.counter = internal.Counter(0)
	}

	defer func() {
		if r := recover(); r != nil {
			var err error
			switch e := r.(type) {
			case string:
				err = fmt.Errorf("error encountered during file writing %s", e)
			case error:
				err = fmt.Errorf("error encountered during file writing: %w", e)
			}
			ret = func(yield func(iceberg.DataFile, error) bool) {
				yield(nil, err)
			}
		}
	}()

	if args.writeUUID == nil {
		u := uuid.Must(uuid.NewRandom())
		args.writeUUID = &u
	}

	targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
		WriteTargetFileSizeBytesDefault))

	nameMapping := meta.CurrentSchema().NameMapping()
	taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
	if err != nil {
		panic(err)
	}

	nextCount, stopCount := iter.Pull(args.counter)
	if meta.CurrentSpec().IsUnpartitioned() {
		tasks := func(yield func(WriteTask) bool) {
			defer stopCount()

			for batch := range binPackRecords(args.itr, 20, targetFileSize) {
				cnt, _ := nextCount()
				t := WriteTask{
					Uuid:    *args.writeUUID,
					ID:      cnt,
					Schema:  taskSchema,
					Batches: batch,
				}
				if !yield(t) {
					return
				}
			}
		}

		return writeFiles(ctx, rootLocation, args.fs, meta, tasks)
	}

	panic(fmt.Errorf("%w: write stream with partitions", iceberg.ErrNotImplemented))
}