in gcs-fetcher/pkg/uploader/uploader.go [86:141]
func (u *Uploader) Do(ctx context.Context, path string, info os.FileInfo) error {
// Follow symlinks.
if spath, err := u.os.EvalSymlinks(path); err != nil {
return err
} else if spath != path {
info, err = u.os.Stat(spath)
if err != nil {
return err
}
path = spath
}
// Don't process dirs.
if info.IsDir() {
return nil
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
// Compute digest of file, and count bytes.
cw := &countWriter{}
h := sha1.New()
if _, err := io.Copy(io.MultiWriter(cw, h), f); err != nil {
return err
}
digest := fmt.Sprintf("%x", h.Sum(nil))
// Seek back to the beginning of the file, to write it to GCS.
// NB: The GCS client is responsible for skipping writes if the file
// already exists.
if _, err := f.Seek(0, 0); err != nil {
return err
}
wc := u.gcs.NewWriter(ctx, u.bucket, digest)
if _, err := io.Copy(wc, f); err != nil {
return err
}
u.manifest.Store(path, common.ManifestItem{
SourceURL: fmt.Sprintf("gs://%s/%s", u.bucket, digest),
Sha1Sum: digest,
FileMode: info.Mode(),
})
if err := wc.Close(); isAlreadyExists(err) {
u.bytesSkipped += cw.b
} else if err != nil {
return err
}
u.totalBytes += cw.b
return nil
}