in table/transaction.go [185:270]
func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties) error {
if len(filesToDelete) == 0 {
if len(filesToAdd) > 0 {
return t.AddFiles(ctx, filesToAdd, snapshotProps, false)
}
}
var (
setToDelete = make(map[string]struct{})
setToAdd = make(map[string]struct{})
)
for _, f := range filesToDelete {
setToDelete[f] = struct{}{}
}
for _, f := range filesToAdd {
setToAdd[f] = struct{}{}
}
if len(setToDelete) != len(filesToDelete) {
return errors.New("delete file paths must be unique for ReplaceDataFiles")
}
if len(setToAdd) != len(filesToAdd) {
return errors.New("add file paths must be unique for ReplaceDataFiles")
}
s := t.meta.currentSnapshot()
if s == nil {
return fmt.Errorf("%w: cannot replace files in a table without an existing snapshot", ErrInvalidOperation)
}
markedForDeletion := make([]iceberg.DataFile, 0, len(setToDelete))
for df, err := range s.dataFiles(t.tbl.fs, nil) {
if err != nil {
return err
}
if _, ok := setToDelete[df.FilePath()]; ok {
markedForDeletion = append(markedForDeletion, df)
}
if _, ok := setToAdd[df.FilePath()]; ok {
return fmt.Errorf("cannot add files that are already referenced by table, files: %s", df.FilePath())
}
}
if len(markedForDeletion) != len(setToDelete) {
return errors.New("cannot delete files that do not belong to the table")
}
if t.meta.NameMapping() == nil {
nameMapping := t.meta.CurrentSchema().NameMapping()
mappingJson, err := json.Marshal(nameMapping)
if err != nil {
return err
}
err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: string(mappingJson)})
if err != nil {
return err
}
}
commitUUID := uuid.New()
updater := t.updateSnapshot(snapshotProps).mergeOverwrite(&commitUUID)
for _, df := range markedForDeletion {
updater.deleteDataFile(df)
}
dataFiles := filesToDataFiles(ctx, t.tbl.fs, t.meta, slices.Values(filesToAdd))
for df, err := range dataFiles {
if err != nil {
return err
}
updater.appendDataFile(df)
}
updates, reqs, err := updater.commit()
if err != nil {
return err
}
return t.apply(updates, reqs)
}