table/writer.go (89 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"context"
"fmt"
"iter"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/config"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table/internal"
"github.com/google/uuid"
)
type WriteTask struct {
Uuid uuid.UUID
ID int
Schema *iceberg.Schema
Batches []arrow.Record
SortOrderID int
}
func (w WriteTask) GenerateDataFileName(extension string) string {
// Mimics the behavior in the Java API:
// https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101
return fmt.Sprintf("00000-%d-%s.%s", w.ID, w.Uuid, extension)
}
type writer struct {
loc LocationProvider
fs io.WriteFileIO
fileSchema *iceberg.Schema
format internal.FileFormat
props any
meta *MetadataBuilder
}
func (w *writer) writeFile(ctx context.Context, task WriteTask) (iceberg.DataFile, error) {
defer func() {
for _, b := range task.Batches {
b.Release()
}
}()
batches := make([]arrow.Record, len(task.Batches))
for i, b := range task.Batches {
rec, err := ToRequestedSchema(ctx, w.fileSchema,
task.Schema, b, false, true, false)
if err != nil {
return nil, err
}
batches[i] = rec
}
statsCols, err := computeStatsPlan(w.fileSchema, w.meta.props)
if err != nil {
return nil, err
}
filePath := w.loc.NewDataLocation(
task.GenerateDataFileName("parquet"))
return w.format.WriteDataFile(ctx, w.fs, internal.WriteFileInfo{
FileSchema: w.fileSchema,
FileName: filePath,
StatsCols: statsCols,
WriteProps: w.props,
}, batches)
}
func writeFiles(ctx context.Context, rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, tasks iter.Seq[WriteTask]) iter.Seq2[iceberg.DataFile, error] {
locProvider, err := LoadLocationProvider(rootLocation, meta.props)
if err != nil {
return func(yield func(iceberg.DataFile, error) bool) {
yield(nil, err)
}
}
format := internal.GetFileFormat(iceberg.ParquetFile)
fileSchema := meta.CurrentSchema()
sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
if err != nil {
return func(yield func(iceberg.DataFile, error) bool) {
yield(nil, err)
}
}
// if the schema needs to be transformed, use the transformed schema
// and adjust the arrow schema appropriately. otherwise we just
// use the original schema.
if !sanitized.Equals(fileSchema) {
fileSchema = sanitized
}
w := &writer{
loc: locProvider,
fs: fs,
fileSchema: fileSchema,
format: format,
props: format.GetWriteProperties(meta.props),
meta: meta,
}
nworkers := config.EnvConfig.MaxWorkers
return internal.MapExec(nworkers, tasks, func(t WriteTask) (iceberg.DataFile, error) {
return w.writeFile(ctx, t)
})
}