internal/repo/directory.go (129 lines of code) (raw):

package repo import ( "errors" "fmt" "strings" "github.com/GoogleCloudPlatform/gcs-metadata-server/internal/model" ) type Directory struct { *Database } type DirectoryRepository interface { Insert(dir model.Directory) error Delete(bucket string, name string) error UpsertParentDirs(storageClass StorageClass, bucket string, objName string, newSize int64, newCount int64) error UpsertArchiveParentDirs(oldStorageClass StorageClass, newStorageClass StorageClass, bucket, objName string, size int64) error } func NewDirectoryRepository(db *Database) DirectoryRepository { return &Directory{db} } // getParentDir returns the parent directory of dir func getParentDir(dir string) string { trimmedDir := strings.TrimSuffix(dir, "/") // Handle root if trimmedDir == "" { return "/" } lastIndex := strings.LastIndex(trimmedDir, "/") if lastIndex == -1 { return "/" // File in root directory } // Remove remaining portion of last directory return trimmedDir[:lastIndex+1] } // UpsertArchiveParentDirs reallocates storage class size on all parent directories for an object update by object versioning. // // If directories do not exist, they will be created using newStorageClass and a default count of 1 // as a safeguard for dirty reads during seeding process. func (d *Directory) UpsertArchiveParentDirs(oldStorageClass StorageClass, newStorageClass StorageClass, bucket, objName string, size int64) error { oldStorageColumn := "size_" + strings.ToLower(string(oldStorageClass)) newStorageColumn := "size_" + strings.ToLower(string(newStorageClass)) query := fmt.Sprintf(` INSERT INTO directory (bucket, name, %[2]s, parent, count) VALUES ($1, $2, $3, $4, 1) ON CONFLICT(bucket, name) DO UPDATE SET %[1]s = %[1]s - $3, %[2]s = %[2]s + $3; `, oldStorageColumn, newStorageColumn) if len(bucket) == 0 || len(objName) == 0 { return errors.New("bucket or name argument is empty") } dirName := getParentDir(objName) tx, err := d.DB.Begin() if err != nil { return err } defer tx.Rollback() // no-op if commit succeeds for { if _, err = tx.Exec(query, bucket, dirName, size, getParentDir(dirName)); err != nil { return err } // Last directory to update is root if dirName == "/" { break } dirName = getParentDir(dirName) } return tx.Commit() } // UpsertParentDirs updates all parent directories of an object name in one transaction func (d *Directory) UpsertParentDirs(storageClass StorageClass, bucket string, objName string, newSize int64, newCount int64) error { storageColumn := "size_" + strings.ToLower(string(storageClass)) query := fmt.Sprintf(` INSERT INTO directory (bucket, name, %[1]s, count, parent) VALUES ($1, $2, $3, $4, $5) ON CONFLICT(bucket, name) DO UPDATE SET %[1]s = %[1]s + $3, count = count + $4; `, storageColumn) if len(bucket) == 0 || len(objName) == 0 { return errors.New("bucket or name argument is empty") } dirName := getParentDir(objName) tx, err := d.DB.Begin() if err != nil { return err } defer tx.Rollback() // no-op if commit succeeds for { if _, err = tx.Exec(query, bucket, dirName, newSize, newCount, getParentDir(dirName)); err != nil { return err } // Last directory to update is root if dirName == "/" { break } dirName = getParentDir(dirName) } if err := tx.Commit(); err != nil { return err } return nil } // Insert a single directory func (d *Directory) Insert(dir model.Directory) error { query := ` INSERT INTO directory (bucket, name, parent) VALUES (?, ?, ?) ` if len(dir.Name) == 0 || len(dir.Bucket) == 0 { return errors.New("bucket or name argument is empty") } parentDir := getParentDir(dir.Name) if _, err := d.DB.Exec(query, dir.Bucket, dir.Name, parentDir); err != nil { return err } return nil } // Delete a single directory func (d *Directory) Delete(bucket string, name string) error { query := ` DELETE FROM directory WHERE bucket = ? AND name = ?; ` res, err := d.DB.Exec(query, bucket, name) if err != nil { return err } rowsAffected, err := res.RowsAffected() if err != nil { return err } if rowsAffected == 0 { return errors.New("no rows affected") } return nil }