in table/transaction.go [272:328]
func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool) error {
set := make(map[string]string)
for _, f := range files {
set[f] = f
}
if len(set) != len(files) {
return errors.New("file paths must be unique for AddFiles")
}
if !ignoreDuplicates {
if s := t.meta.currentSnapshot(); s != nil {
referenced := make([]string, 0)
for df, err := range s.dataFiles(t.tbl.fs, nil) {
if err != nil {
return err
}
if _, ok := set[df.FilePath()]; ok {
referenced = append(referenced, df.FilePath())
}
}
if len(referenced) > 0 {
return fmt.Errorf("cannot add files that are already referenced by table, files: %s", referenced)
}
}
}
if t.meta.NameMapping() == nil {
nameMapping := t.meta.CurrentSchema().NameMapping()
mappingJson, err := json.Marshal(nameMapping)
if err != nil {
return err
}
err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: string(mappingJson)})
if err != nil {
return err
}
}
updater := t.updateSnapshot(snapshotProps).fastAppend()
dataFiles := filesToDataFiles(ctx, t.tbl.fs, t.meta, slices.Values(files))
for df, err := range dataFiles {
if err != nil {
return err
}
updater.appendDataFile(df)
}
updates, reqs, err := updater.commit()
if err != nil {
return err
}
return t.apply(updates, reqs)
}