table/internal/interfaces.go (68 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"context"
"fmt"
"io"
"path"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
)
// GetFile opens the given file using the provided file system.
//
// The FileSource interface allows abstracting away the underlying file format
// while providing utilties to read the file as Arrow record batches.
func GetFile(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile, isPosDeletes bool) (FileSource, error) {
switch dataFile.FileFormat() {
case iceberg.ParquetFile:
return &ParquetFileSource{
mem: compute.GetAllocator(ctx),
fs: fs,
file: dataFile,
}, nil
default:
return nil, fmt.Errorf("%w: only parquet format is implemented, got %s",
iceberg.ErrNotImplemented, dataFile.FileFormat())
}
}
type FileSource interface {
GetReader(context.Context) (FileReader, error)
}
type Metadata any
type FileReader interface {
io.Closer
Metadata() Metadata
SourceFileSize() int64
Schema() (*arrow.Schema, error)
// PrunedSchema takes in the list of projected field IDs and returns the arrow schema
// that represents the underlying file schema with only the projected fields. It also
// returns the indexes of the projected columns to allow reading *only* the needed
// columns.
PrunedSchema(projectedIDs map[int]struct{}, mapping iceberg.NameMapping) (*arrow.Schema, []int, error)
// GetRecords returns a record reader for only the provided columns (using nil will read
// all of the columns of the underlying file.) The `tester` is a function that can be used,
// if non-nil, to filter aspects of the file such as skipping row groups in a parquet file.
GetRecords(ctx context.Context, cols []int, tester any) (array.RecordReader, error)
// ReadTable reads the entire file and returns it as an arrow table.
ReadTable(context.Context) (arrow.Table, error)
}
type FileFormat interface {
Open(context.Context, iceio.IO, string) (FileReader, error)
PathToIDMapping(*iceberg.Schema) (map[string]int, error)
DataFileStatsFromMeta(rdr Metadata, statsCols map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics
GetWriteProperties(iceberg.Properties) any
WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, info WriteFileInfo, batches []arrow.Record) (iceberg.DataFile, error)
}
func GetFileFormat(format iceberg.FileFormat) FileFormat {
switch format {
case iceberg.ParquetFile:
return parquetFormat{}
default:
return nil
}
}
func FormatFromFileName(fileName string) FileFormat {
switch path.Ext(fileName) {
case ".parquet":
return parquetFormat{}
default:
return nil
}
}
type WriteFileInfo struct {
FileSchema *iceberg.Schema
Spec iceberg.PartitionSpec
FileName string
StatsCols map[int]StatisticsCollector
WriteProps any
}