sources.go (162 lines of code) (raw):
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package daisy
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"cloud.google.com/go/storage"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
)
type objectRegistry struct {
created []string
mx sync.Mutex
}
func newObjectRegistry(w *Workflow) *objectRegistry {
return &objectRegistry{}
}
func (o *objectRegistry) regCreate(object string) DError {
o.mx.Lock()
defer o.mx.Unlock()
if strIn(object, o.created) {
return Errf("cannot create object %q, object already created by another step", object)
}
o.created = append(o.created, object)
return nil
}
var sourceVarRgx = regexp.MustCompile(`\$\{SOURCE:([^}]+)}`)
func (w *Workflow) recursiveGCS(ctx context.Context, bkt, prefix, dst string) DError {
it := w.StorageClient.Bucket(bkt).Objects(ctx, &storage.Query{Prefix: prefix})
for objAttr, err := it.Next(); err != iterator.Done; objAttr, err = it.Next() {
if err != nil {
return typedErr(apiError, "failed to iterate GCS objects for uploading", err)
}
if objAttr.Size == 0 {
continue
}
srcPath := w.StorageClient.Bucket(bkt).Object(objAttr.Name)
o := path.Join(w.sourcesPath, dst, strings.TrimPrefix(objAttr.Name, prefix))
dstPath := w.StorageClient.Bucket(w.bucket).Object(o)
if _, err := dstPath.CopierFrom(srcPath).Run(ctx); err != nil {
return typedErr(apiError, "failed to upload GCS object", err)
}
}
return nil
}
func (w *Workflow) sourceExists(s string) bool {
_, ok := w.Sources[s]
return ok
}
func (w *Workflow) sourceContent(ctx context.Context, s string) (string, error) {
src, ok := w.Sources[s]
if !ok {
return "", Errf("source not found: %s", s)
}
// Try GCS file first.
if bkt, objPath, err := splitGCSPath(src); err == nil {
if objPath == "" || strings.HasSuffix(objPath, "/") {
return "", Errf("source %s appears to be a GCS 'bucket'", src)
}
src := w.StorageClient.Bucket(bkt).Object(objPath)
r, err := src.NewReader(ctx)
if err != nil {
return "", Errf("error reading from file %s/%s: %v", bkt, objPath, err)
}
defer r.Close()
if r.Size() > 1024 {
return "", Errf("file size is too large %s/%s: %d", bkt, objPath, r.Size())
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
return "", Errf("error reading from file %s/%s: %v", bkt, objPath, err)
}
return buf.String(), nil
}
// Fall back to local read.
if !filepath.IsAbs(src) {
src = filepath.Join(w.workflowDir, src)
}
if _, err := os.Stat(src); err != nil {
return "", typedErr(fileIOError, "failed to find local file", err)
}
d, err := ioutil.ReadFile(src)
if err != nil {
return "", newErr("failed to read local file content", err)
}
return string(d), nil
}
func (w *Workflow) uploadFile(ctx context.Context, src, obj string) DError {
obj = filepath.ToSlash(obj)
dstPath := w.StorageClient.Bucket(w.bucket).Object(path.Join(w.sourcesPath, obj))
gcs := dstPath.NewWriter(ctx)
f, err := os.Open(src)
if err != nil {
return newErr("failed to open local file for uploading", err)
}
if _, err := io.Copy(gcs, f); err != nil {
return newErr("failed to copy local file to GCS", err)
}
return newErr("failed to close GCS object", gcs.Close())
}
func (w *Workflow) uploadSources(ctx context.Context) DError {
for dst, origPath := range w.Sources {
if origPath == "" {
continue
}
// GCS to GCS.
if bkt, objPath, err := splitGCSPath(origPath); err == nil {
if objPath == "" || strings.HasSuffix(objPath, "/") {
if err := w.recursiveGCS(ctx, bkt, objPath, dst); err != nil {
return Errf("error copying from bucket %s: %v", origPath, err)
}
continue
}
src := w.StorageClient.Bucket(bkt).Object(objPath)
dstPath := w.StorageClient.Bucket(w.bucket).Object(path.Join(w.sourcesPath, dst))
if _, err := dstPath.CopierFrom(src).Run(ctx); err != nil {
if gErr, ok := err.(*googleapi.Error); ok && gErr.Code == http.StatusNotFound {
return typedErrf(resourceDNEError, "error copying from file %s: %v", origPath, err)
}
return Errf("error copying from file %s: %v", origPath, err)
}
continue
}
// Local to GCS.
if !filepath.IsAbs(origPath) {
origPath = filepath.Join(w.workflowDir, origPath)
}
fi, err := os.Stat(origPath)
if err != nil {
return typedErr(fileIOError, "failed to open local file", err)
}
if fi.IsDir() {
var files []string
if err := filepath.Walk(origPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
files = append(files, path)
return nil
}); err != nil {
return typedErr(fileIOError, "failed to walk file path", err)
}
for _, file := range files {
obj := path.Join(dst, strings.TrimPrefix(file, filepath.Clean(origPath)))
if err := w.uploadFile(ctx, file, obj); err != nil {
return err
}
}
continue
}
if err := w.uploadFile(ctx, origPath, dst); err != nil {
return err
}
}
return nil
}