func()

in sources.go [140:199]


func (w *Workflow) uploadSources(ctx context.Context) DError {
	for dst, origPath := range w.Sources {
		if origPath == "" {
			continue
		}
		// GCS to GCS.
		if bkt, objPath, err := splitGCSPath(origPath); err == nil {
			if objPath == "" || strings.HasSuffix(objPath, "/") {
				if err := w.recursiveGCS(ctx, bkt, objPath, dst); err != nil {
					return Errf("error copying from bucket %s: %v", origPath, err)
				}
				continue
			}
			src := w.StorageClient.Bucket(bkt).Object(objPath)
			dstPath := w.StorageClient.Bucket(w.bucket).Object(path.Join(w.sourcesPath, dst))
			if _, err := dstPath.CopierFrom(src).Run(ctx); err != nil {
				if gErr, ok := err.(*googleapi.Error); ok && gErr.Code == http.StatusNotFound {
					return typedErrf(resourceDNEError, "error copying from file %s: %v", origPath, err)
				}
				return Errf("error copying from file %s: %v", origPath, err)
			}
			continue
		}

		// Local to GCS.
		if !filepath.IsAbs(origPath) {
			origPath = filepath.Join(w.workflowDir, origPath)
		}
		fi, err := os.Stat(origPath)
		if err != nil {
			return typedErr(fileIOError, "failed to open local file", err)
		}
		if fi.IsDir() {
			var files []string
			if err := filepath.Walk(origPath, func(path string, info os.FileInfo, err error) error {
				if err != nil {
					return err
				}
				if info.IsDir() {
					return nil
				}
				files = append(files, path)
				return nil
			}); err != nil {
				return typedErr(fileIOError, "failed to walk file path", err)
			}
			for _, file := range files {
				obj := path.Join(dst, strings.TrimPrefix(file, filepath.Clean(origPath)))
				if err := w.uploadFile(ctx, file, obj); err != nil {
					return err
				}
			}
			continue
		}
		if err := w.uploadFile(ctx, origPath, dst); err != nil {
			return err
		}
	}
	return nil
}