in sources.go [140:199]
func (w *Workflow) uploadSources(ctx context.Context) DError {
for dst, origPath := range w.Sources {
if origPath == "" {
continue
}
// GCS to GCS.
if bkt, objPath, err := splitGCSPath(origPath); err == nil {
if objPath == "" || strings.HasSuffix(objPath, "/") {
if err := w.recursiveGCS(ctx, bkt, objPath, dst); err != nil {
return Errf("error copying from bucket %s: %v", origPath, err)
}
continue
}
src := w.StorageClient.Bucket(bkt).Object(objPath)
dstPath := w.StorageClient.Bucket(w.bucket).Object(path.Join(w.sourcesPath, dst))
if _, err := dstPath.CopierFrom(src).Run(ctx); err != nil {
if gErr, ok := err.(*googleapi.Error); ok && gErr.Code == http.StatusNotFound {
return typedErrf(resourceDNEError, "error copying from file %s: %v", origPath, err)
}
return Errf("error copying from file %s: %v", origPath, err)
}
continue
}
// Local to GCS.
if !filepath.IsAbs(origPath) {
origPath = filepath.Join(w.workflowDir, origPath)
}
fi, err := os.Stat(origPath)
if err != nil {
return typedErr(fileIOError, "failed to open local file", err)
}
if fi.IsDir() {
var files []string
if err := filepath.Walk(origPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
files = append(files, path)
return nil
}); err != nil {
return typedErr(fileIOError, "failed to walk file path", err)
}
for _, file := range files {
obj := path.Join(dst, strings.TrimPrefix(file, filepath.Clean(origPath)))
if err := w.uploadFile(ctx, file, obj); err != nil {
return err
}
}
continue
}
if err := w.uploadFile(ctx, origPath, dst); err != nil {
return err
}
}
return nil
}