gcs-fetcher/pkg/uploader/uploader.go (125 lines of code) (raw):

/* Copyright 2018 Google, Inc. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package uploader import ( "context" "crypto/sha1" "encoding/json" "fmt" "io" "net/http" "os" "sync" "google.golang.org/api/googleapi" "github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher/pkg/common" ) // Uploader encapsulates methods for uploading files incrementally and // producing a source manifest. type Uploader struct { gcs GCS os OS bucket, manifestObject string manifest sync.Map totalBytes, bytesSkipped int64 } // OS allows us to inject dependencies to facilitate testing. type OS interface { EvalSymlinks(path string) (string, error) Stat(path string) (os.FileInfo, error) } // GCS allows us to inject dependencies to facilitate testing. type GCS interface { NewWriter(ctx context.Context, bucket, object string) io.WriteCloser } type job struct { path string info os.FileInfo } // New returns a new Uploader. func New(ctx context.Context, gcs GCS, os OS, bucket, manifestObject string, numWorkers int) *Uploader { return &Uploader{ gcs: gcs, os: os, bucket: bucket, manifestObject: manifestObject, } } // Wait blocks until ongoing uploads are complete, or until an error is // encountered. func (u *Uploader) Done(ctx context.Context) error { uploaded := u.totalBytes - u.bytesSkipped var incr float64 if u.totalBytes != 0 { incr = float64(100 * u.bytesSkipped / u.totalBytes) } fmt.Printf(` ****************************************************** * Uploaded %d bytes (%.2f%% incremental) ****************************************************** `, uploaded, incr) return u.writeManifest(ctx) } func (u *Uploader) Do(ctx context.Context, path string, info os.FileInfo) error { // Follow symlinks. if spath, err := u.os.EvalSymlinks(path); err != nil { return err } else if spath != path { info, err = u.os.Stat(spath) if err != nil { return err } path = spath } // Don't process dirs. if info.IsDir() { return nil } f, err := os.Open(path) if err != nil { return err } defer f.Close() // Compute digest of file, and count bytes. cw := &countWriter{} h := sha1.New() if _, err := io.Copy(io.MultiWriter(cw, h), f); err != nil { return err } digest := fmt.Sprintf("%x", h.Sum(nil)) // Seek back to the beginning of the file, to write it to GCS. // NB: The GCS client is responsible for skipping writes if the file // already exists. if _, err := f.Seek(0, 0); err != nil { return err } wc := u.gcs.NewWriter(ctx, u.bucket, digest) if _, err := io.Copy(wc, f); err != nil { return err } u.manifest.Store(path, common.ManifestItem{ SourceURL: fmt.Sprintf("gs://%s/%s", u.bucket, digest), Sha1Sum: digest, FileMode: info.Mode(), }) if err := wc.Close(); isAlreadyExists(err) { u.bytesSkipped += cw.b } else if err != nil { return err } u.totalBytes += cw.b return nil } type countWriter struct { b int64 } func (c *countWriter) Write(b []byte) (int, error) { c.b += int64(len(b)) return len(b), nil } func isAlreadyExists(err error) bool { if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == http.StatusPreconditionFailed { return true } return false } func (u *Uploader) writeManifest(ctx context.Context) error { m := map[string]common.ManifestItem{} u.manifest.Range(func(k, v interface{}) bool { m[k.(string)] = v.(common.ManifestItem) return true }) wc := u.gcs.NewWriter(ctx, u.bucket, u.manifestObject) if err := json.NewEncoder(wc).Encode(m); err != nil { return err } if err := wc.Close(); err != nil { return err } fmt.Printf("Wrote manifest object gs://%s/%s", u.bucket, u.manifestObject) return nil }