util/gcs/sort.go (101 lines of code) (raw):

/* Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package gcs import ( "context" "errors" "sort" "sync" "time" "cloud.google.com/go/storage" "github.com/fvbommel/sortorder" "github.com/sirupsen/logrus" ) // StatResult contains the result of calling Stat, including any error type StatResult struct { Attrs *storage.ObjectAttrs Err error } // Stat multiple paths using concurrent workers. Result indexes match paths. func Stat(ctx context.Context, client Stater, workers int, paths ...Path) []StatResult { var wg sync.WaitGroup wg.Add(workers) out := make([]StatResult, len(paths)) ch := make(chan int) for i := 0; i < workers; i++ { go func() { defer wg.Done() for idx := range ch { out[idx].Attrs, out[idx].Err = client.Stat(ctx, paths[idx]) } }() } for idx := range paths { ch <- idx } close(ch) wg.Wait() return out } // StatExisting reduces Stat() to an array of ObjectAttrs. // // Non-existent objects will return a pointer to a zero storage.ObjectAttrs. // Objects that fail to stat will be nil (and log). func StatExisting(ctx context.Context, log logrus.FieldLogger, client Stater, paths ...Path) []*storage.ObjectAttrs { out := make([]*storage.ObjectAttrs, len(paths)) attrs := Stat(ctx, client, 20, paths...) for i, attrs := range attrs { err := attrs.Err switch { case attrs.Attrs != nil: out[i] = attrs.Attrs case errors.Is(err, storage.ErrObjectNotExist): out[i] = &storage.ObjectAttrs{} default: log.WithError(err).WithField("path", paths[i]).Info("Failed to stat") } } return out } // LeastRecentlyUpdated sorts paths by their update timestamp, noting generations and any errors. func LeastRecentlyUpdated(ctx context.Context, log logrus.FieldLogger, client Stater, paths []Path) map[Path]int64 { ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() log.Debug("Sorting groups") const workers = 20 attrs := Stat(ctx, client, workers, paths...) updated := make(map[Path]time.Time, len(paths)) generations := make(map[Path]int64, len(paths)) for i, path := range paths { attrs, err := attrs[i].Attrs, attrs[i].Err switch { case err == storage.ErrObjectNotExist: generations[path] = 0 case err != nil: log.WithError(err).WithField("path", path).Warning("Stat failed") generations[path] = -1 default: updated[path] = attrs.Updated generations[path] = attrs.Generation } } sort.SliceStable(paths, func(i, j int) bool { return !updated[paths[i]].After(updated[paths[j]]) }) if n := len(paths) - 1; n > 0 { p0 := paths[0] pn := paths[n] log.WithFields(logrus.Fields{ "newest-path": pn, "newest": updated[pn], "oldest-path": p0, "oldest": updated[p0], }).Info("Sorted") } return generations } // Touch attempts to win an update of the object. // // Cloud copies the current object to itself when the object already exists. // Otherwise uploads genZero bytes. func Touch(ctx context.Context, client ConditionalClient, path Path, generation int64, genZero []byte) (*storage.ObjectAttrs, error) { var cond storage.Conditions if generation != 0 { // Attempt to cloud-copy the object to its current location // - only 1 will win in a concurrent situation // - Increases the last update time. cond.GenerationMatch = generation return client.If(&cond, &cond).Copy(ctx, path, path) } // New group, upload the bytes for this situation. cond.DoesNotExist = true return client.If(&cond, &cond).Upload(ctx, path, genZero, DefaultACL, NoCache) } // Sort the builds by monotonically decreasing original prefix base name. // // In other words, // gs://b/1 // gs://a/5 // gs://c/10 // becomes: // gs://c/10 // gs://a/5 // gs://b/1 func Sort(builds []Build) { sort.SliceStable(builds, func(i, j int) bool { // greater return !sortorder.NaturalLess(builds[i].baseName, builds[j].baseName) && builds[i].baseName != builds[j].baseName }) }