gcs-fetcher/pkg/fetcher/fetcher.go (756 lines of code) (raw):
/*
Copyright 2018 Google, Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fetcher
import (
"archive/tar"
"archive/zip"
"compress/gzip"
"context"
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math"
"math/rand"
"net/http"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher/pkg/common"
"google.golang.org/api/googleapi"
)
var (
// sourceExt is a best-effort to identify files that should have a short
// download time and thus get a short timeout for the first tries.
sourceExt = map[string]bool{
".js": true,
".py": true,
".php": true,
".java": true,
".go": true,
".cs": true,
".rb": true,
".css": true,
".vb": true,
".pl": true, // Perl. C'mon, live a little.
}
sourceTimeout = map[int]time.Duration{ // try number -> timeout duration
0: 1 * time.Second,
1: 2 * time.Second,
}
notSourceTimeout = map[int]time.Duration{ // try number -> timeout duration
0: 3 * time.Second,
1: 6 * time.Second,
}
defaultTimeout = 1 * time.Hour
noTimeout = 0 * time.Second
errGCSTimeout = errors.New("GCS timeout")
robotRegex = regexp.MustCompile(`<Details>(\S+@\S+)\s`)
nonHexRegex = regexp.MustCompile(`[^0-9a-f]`)
errorExitStatus = 1
permissionDeniedExitStatus = 3
)
type sizeBytes int64
// job is a file to download, corresponds to an entry in the manifest file.
type job struct {
filename string
bucket, object string
generation int64
sha1sum string
destDirOverride string
}
// jobAttempt is an attempt to download a particular file, may result in
// success or failure (indicated by err).
type jobAttempt struct {
started time.Time
duration time.Duration
err error
gcsTimeout time.Duration
}
// jobReport stores all the details about the attempts to download a
// particular file.
type jobReport struct {
job job
started time.Time
completed time.Time
size sizeBytes
attempts []jobAttempt
success bool
finalname string
err error
}
type fetchOnceResult struct {
size sizeBytes
err error
}
type stats struct {
workers int
files int
size sizeBytes
duration time.Duration
retries int
gcsTimeouts int
success bool
errs []error
}
// OS allows us to inject dependencies to facilitate testing.
type OS interface {
Rename(oldpath, newpath string) error
Chmod(name string, mode os.FileMode) error
Create(name string) (*os.File, error)
MkdirAll(path string, perm os.FileMode) error
Open(name string) (*os.File, error)
RemoveAll(path string) error
}
// GCS allows us to inject dependencies to facilitate testing.
type GCS interface {
NewReader(ctx context.Context, bucket, object string) (io.ReadCloser, error)
}
// Fetcher is the main workhorse of this package and does all the heavy lifting.
type Fetcher struct {
GCS GCS
OS OS
DestDir string
KeepSource bool
StagingDir string
// mu guards CreatedDirs
mu sync.Mutex
CreatedDirs map[string]bool
SourceType string
Bucket, Object string
Generation int64
TimeoutGCS bool
WorkerCount int
Retries int
Backoff time.Duration
Verbose bool
Stdout io.Writer
Stderr io.Writer
}
type permissionError struct {
bucket string
robot string
}
func (e *permissionError) Error() string {
return fmt.Sprintf("Access to bucket %s denied. You must grant Storage Object Viewer permission to %s. If you are using VPC Service Controls, you must also grant it access to your service perimeter.", e.bucket, e.robot)
}
func logit(writer io.Writer, format string, a ...interface{}) {
if _, err := fmt.Fprintf(writer, format+"\n", a...); err != nil {
log.Printf("Failed to write message: "+format, a...)
}
}
func (gf *Fetcher) log(format string, a ...interface{}) {
logit(gf.Stdout, format, a...)
}
func (gf *Fetcher) logErr(format string, a ...interface{}) {
logit(gf.Stderr, format, a...)
}
func (gf *Fetcher) recordFailure(j job, started time.Time, gcsTimeout time.Duration, err error, report *jobReport) {
attempt := jobAttempt{
started: started,
duration: time.Since(started),
err: err,
gcsTimeout: gcsTimeout,
}
report.success = false
report.err = err // Hold the latest error.
report.attempts = append(report.attempts, attempt)
isLast := len(report.attempts) == gf.Retries
if gf.Verbose || isLast {
retryMsg := ", will retry"
if isLast {
retryMsg = ", will no longer retry"
}
gf.log("Failed to fetch %s%s: %v", formatGCSName(j.bucket, j.object, j.generation), retryMsg, err)
}
}
func (gf *Fetcher) recordSuccess(j job, started time.Time, size sizeBytes, finalname string, report *jobReport) {
attempt := jobAttempt{
started: started,
duration: time.Since(started),
}
report.success = true
report.err = nil
report.size = size
report.attempts = append(report.attempts, attempt)
report.finalname = finalname
mibps := math.MaxFloat64
if attempt.duration > 0 {
mibps = (float64(report.size) / 1024 / 1024) / attempt.duration.Seconds()
}
if gf.Verbose {
log.Printf("Fetched %s (%dB in %v, %.2fMiB/s)", formatGCSName(j.bucket, j.object, j.generation), report.size, attempt.duration, mibps)
}
}
// fetchObject is responsible for trying (and retrying) to fetch a single file
// from GCS. It first downloads the file to a temp file, then renames it to
// the final location and sets the permissions on the final file.
func (gf *Fetcher) fetchObject(ctx context.Context, j job) *jobReport {
report := &jobReport{job: j, started: time.Now()}
defer func() {
report.completed = time.Now()
}()
var tmpfile string
var backoff time.Duration
// Within a manifest, multiple files may have the same SHA. This can lead
// to a race condition within the goworkers that are downloading the files
// concurrently. To mitigate this issue, we add some randomness to the name
// of the temp file being pulled.
fuzz := rand.Intn(999999)
for retrynum := 0; retrynum <= gf.Retries; retrynum++ {
// Apply appropriate retry backoff.
if retrynum > 0 {
if retrynum == 1 {
backoff = gf.Backoff
} else {
backoff *= 2
}
time.Sleep(backoff)
}
started := time.Now()
// Download to temp location [DestDir]/[StagingDir]/[Bucket]-[Object]-[fuzz]-[retry]
// If fetchObjectOnceWithTimeout() times out, this file will be orphaned and we can
// clean it up later.
tmpfile = filepath.Join(gf.StagingDir, fmt.Sprintf("%s-%s-%d-%d", j.bucket, j.object, fuzz, retrynum))
if err := gf.ensureFolders(tmpfile); err != nil {
e := fmt.Errorf("creating folders for temp file %q: %v", tmpfile, err)
gf.recordFailure(j, started, noTimeout, e, report)
continue
}
allowedGCSTimeout := gf.timeout(j.filename, retrynum)
size, err := gf.fetchObjectOnceWithTimeout(ctx, j, allowedGCSTimeout, tmpfile)
if err != nil {
// Allow permissionError to bubble up.
e := err
if _, ok := err.(*permissionError); !ok {
e = fmt.Errorf("fetching %q with timeout %v to temp file %q: %v", formatGCSName(j.bucket, j.object, j.generation), allowedGCSTimeout, tmpfile, err)
}
gf.recordFailure(j, started, allowedGCSTimeout, e, report)
continue
}
// Rename the temp file to the final filename
dest := gf.DestDir
if j.destDirOverride != "" {
dest = j.destDirOverride
}
finalname := filepath.Join(dest, j.filename)
if err := gf.ensureFolders(finalname); err != nil {
e := fmt.Errorf("creating folders for final file %q: %v", finalname, err)
gf.recordFailure(j, started, noTimeout, e, report)
continue
}
if err := gf.OS.Rename(tmpfile, finalname); err != nil {
e := fmt.Errorf("renaming %q to %q: %v", tmpfile, finalname, err)
gf.recordFailure(j, started, noTimeout, e, report)
continue
}
// TODO(jasonco): make the posix attributes match the source
// This will only work if the original upload sends the posix
// attributes to GCS. For now, we'll just give the user full
// access.
mode := os.FileMode(0555)
if err := gf.OS.Chmod(finalname, mode); err != nil {
e := fmt.Errorf("chmod %q to %v: %v", finalname, mode, err)
gf.recordFailure(j, started, noTimeout, e, report)
continue
}
gf.recordSuccess(j, started, size, finalname, report)
break // Success! No more retries needed.
}
return report
}
// fetchObjectOnceWithTimeout is merely mechanics to call fetchObjectOnce(),
// using a circuit breaker pattern to timeout the call if it takes too long.
// GCS has long tail latencies, so we retry with low timeouts on the first
// couple of attempts. On subsequent attempts, we simply wait for a long time.
func (gf *Fetcher) fetchObjectOnceWithTimeout(ctx context.Context, j job, timeout time.Duration, dest string) (sizeBytes, error) {
result := make(chan fetchOnceResult, 1)
breakerSig := make(chan struct{}, 1)
// Start the function that we want to timeout if it takes too long.
go func() {
result <- gf.fetchObjectOnce(ctx, j, dest, breakerSig)
}()
// Wait to see who finshes first: function or timeout
select {
case r := <-result:
return r.size, r.err
case <-ctx.Done():
close(breakerSig) // Signal fetchObjectOnce() to cancel
if ctx.Err() == context.DeadlineExceeded {
return 0, errGCSTimeout
}
return 0, ctx.Err()
case <-time.After(timeout):
close(breakerSig) // Signal fetchObjectOnce() to cancel
return 0, errGCSTimeout
}
}
// fetchObjectOnce has the responsibility of downloading a file from
// GCS and saving it to the dest location. If it receives a signal on
// breakerSig, it will attempt to return quickly, though it is assumed
// that no one is listening for a response anymore.
func (gf *Fetcher) fetchObjectOnce(ctx context.Context, j job, dest string, breakerSig <-chan struct{}) fetchOnceResult {
var result fetchOnceResult
r, err := gf.GCS.NewReader(ctx, j.bucket, j.object)
if err != nil {
// Check for AccessDenied failure here and return a useful error message on Stderr and exit immediately.
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == http.StatusForbidden {
// Try to parse out the robot name.
match := robotRegex.FindStringSubmatch(err.Error())
robot := "your Cloud Build service account"
if len(match) == 2 {
robot = match[1]
}
result.err = &permissionError{bucket: j.bucket, robot: robot}
return result
}
result.err = fmt.Errorf("creating GCS reader for %q: %v", formatGCSName(j.bucket, j.object, j.generation), err)
return result
}
defer func() {
if cerr := r.Close(); cerr != nil {
result.err = fmt.Errorf("Failed to close GCS reader: %v", cerr)
}
}()
// If we're cancelled, just return.
select {
case <-breakerSig:
result.err = errGCSTimeout
return result
default:
// Fallthrough
}
f, err := gf.OS.Create(dest)
if err != nil {
result.err = fmt.Errorf("creating destination file %q: %v", dest, err)
return result
}
defer func() {
if cerr := f.Close(); cerr != nil {
result.err = fmt.Errorf("Failed to close file %q: %v", dest, cerr)
}
}()
h := sha1.New()
n, err := io.Copy(f, io.TeeReader(r, h))
if err != nil {
result.err = fmt.Errorf("copying bytes from %q to %q: %v", formatGCSName(j.bucket, j.object, j.generation), dest, err)
return result
}
// If we're cancelled, just return.
select {
case <-breakerSig:
result.err = errGCSTimeout
return result
default:
// Fallthrough
}
result.size = sizeBytes(n)
// Verify the sha1sum before declaring success.
if j.sha1sum != "" {
got := strings.ToLower(fmt.Sprintf("%x", h.Sum(nil)))
want := strings.ToLower(nonHexRegex.ReplaceAllString(j.sha1sum, ""))
if got != want {
result.err = fmt.Errorf("%s SHA mismatch, got %q, want %q", j.filename, got, want)
return result
}
}
return result
}
// ensureFolders takes a full path to a filename and makes sure that
// all the folders leading to the filename exist.
func (gf *Fetcher) ensureFolders(filename string) error {
filedir := filepath.Dir(filename)
gf.mu.Lock()
defer gf.mu.Unlock()
if _, ok := gf.CreatedDirs[filedir]; !ok {
if err := gf.OS.MkdirAll(filedir, os.FileMode(0777)|os.ModeDir); err != nil {
return fmt.Errorf("ensuring folders for %q: %v", filedir, err)
}
gf.CreatedDirs[filedir] = true
}
return nil
}
// doWork is the worker routine. It listens for jobs, fetches the file,
// and emits a job report. This continues until channel job is closed.
func (gf *Fetcher) doWork(ctx context.Context, todo <-chan job, results chan<- jobReport) {
for j := range todo {
report := gf.fetchObject(ctx, j)
if gf.Verbose {
gf.log("Report: %#v", report)
}
results <- *report
}
}
// processJobs is the primary concurrency mechanics for Fetcher.
// This method spins up a set of worker goroutines, creates a
// goroutine to send all the jobs to the workers, then waits for
// all the jobs to complete. It also compiles and returns final
// statistics for the jobs.
func (gf *Fetcher) processJobs(ctx context.Context, jobs []job) stats {
workerCount := gf.WorkerCount
if len(jobs) < workerCount {
workerCount = len(jobs)
}
todo := make(chan job, workerCount)
results := make(chan jobReport, workerCount)
stats := stats{workers: workerCount, files: len(jobs), success: true}
// Spin up our workers.
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
gf.doWork(ctx, todo, results)
wg.Done()
}()
}
// Queue the jobs.
started := time.Now()
var qwg sync.WaitGroup
qwg.Add(1)
go func() {
for _, j := range jobs {
todo <- j
}
qwg.Done()
}()
// Consume the reports.
failed := false
hasPermissionError := false
for n := 0; n < len(jobs); n++ {
report := <-results
if !report.success {
if err, ok := report.err.(*permissionError); ok {
hasPermissionError = true
gf.logErr(err.Error())
}
failed = true
}
stats.size += report.size
lastIndex := len(report.attempts) - 1
stats.retries += lastIndex // First attempt is not considered a "retry".
finalAttempt := report.attempts[lastIndex]
stats.duration += finalAttempt.duration
if finalAttempt.err != nil {
stats.errs = append(stats.errs, finalAttempt.err)
}
for _, attempt := range report.attempts {
if attempt.gcsTimeout > noTimeout {
stats.gcsTimeouts++
}
}
}
qwg.Wait()
close(results)
close(todo)
wg.Wait()
if failed {
stats.success = false
gf.logErr("Failed to download at least one file. Cannot continue.")
if hasPermissionError {
os.Exit(permissionDeniedExitStatus)
}
os.Exit(errorExitStatus)
}
stats.duration = time.Since(started)
return stats
}
// getTimeout returns the GCS timeout that should be used for a given
// filenum on a given retry number. GCS has long tails on occasion, so
// in some cases, it's faster to give up early and retry on a second
// connection.
func (gf *Fetcher) timeout(filename string, retrynum int) time.Duration {
if gf.TimeoutGCS == false {
return defaultTimeout
}
// Use short timeouts for source code, longer for non-source
if sourceExt[filepath.Ext(filename)] {
if timeout, ok := sourceTimeout[retrynum]; ok {
return timeout
}
} else {
if timeout, ok := notSourceTimeout[retrynum]; ok {
return timeout
}
}
return defaultTimeout
}
// fetchFromManifest is used when downloading source based on a manifest file.
// It is responsible for fetching the manifest file, decoding the JSON, and
// assembling the list of jobs to process (i.e., files to download).
func (gf *Fetcher) fetchFromManifest(ctx context.Context) (err error) {
started := time.Now()
gf.log("Fetching manifest %s.", formatGCSName(gf.Bucket, gf.Object, gf.Generation))
// Download the manifest file from GCS.
manifestDir := gf.StagingDir
j := job{
filename: gf.Object,
bucket: gf.Bucket,
object: gf.Object,
generation: gf.Generation,
destDirOverride: manifestDir,
}
// Override the retry/backoff to span an up-to-11 second eventual consistency
// issue on new project creation. We'll only do this for the first file
// (the manifest), and then drop back to the original retry/backoff.
oretries, obackoff := gf.Retries, gf.Backoff
gf.Retries, gf.Backoff = 6, 1*time.Second // Yields 1s, 2s, 4s, 8s, 16s
report := gf.fetchObject(ctx, j)
gf.Retries, gf.Backoff = oretries, obackoff
if !report.success {
if err, ok := report.err.(*permissionError); ok {
gf.logErr(err.Error())
os.Exit(permissionDeniedExitStatus)
}
return fmt.Errorf("failed to download manifest %s: %v", formatGCSName(gf.Bucket, gf.Object, gf.Generation), report.err)
}
// Decode the JSON manifest
manifestFile := filepath.Join(manifestDir, j.filename)
r, err := gf.OS.Open(manifestFile)
if err != nil {
return fmt.Errorf("opening manifest file %q: %v", manifestFile, err)
}
defer func() {
if cerr := r.Close(); cerr != nil {
err = fmt.Errorf("Failed to close file %q: %v", manifestFile, cerr)
}
}()
var files map[string]common.ManifestItem
if err := json.NewDecoder(r).Decode(&files); err != nil {
return fmt.Errorf("decoding JSON from manifest file %q: %v", manifestFile, err)
}
// Create the jobs
var jobs []job
for filename, info := range files {
bucket, object, generation, err := common.ParseBucketObject(info.SourceURL)
if err != nil {
return fmt.Errorf("parsing bucket/object from %q: %v", info.SourceURL, err)
}
j := job{
filename: filename,
bucket: bucket,
object: object,
generation: generation,
sha1sum: info.Sha1Sum,
}
jobs = append(jobs, j)
}
gf.log("Processing %v files.", len(jobs))
stats := gf.processJobs(ctx, jobs)
// Final cleanup of failed downloads. We won't miss any files; these vestiges
// are from go routines that have timed out and would otherwise check their
// circuit breaker and die. However, we won't wait for these remaining
// go routines to finish because out goal is to get done as fast as possible!
if err := gf.OS.RemoveAll(gf.StagingDir); err != nil {
gf.log("Failed to remove staging dir %v, continuing: %v", gf.StagingDir, err)
}
// Emit final stats.
mib := float64(stats.size) / 1024 / 1024
var mibps float64
if stats.duration > 0 {
mibps = mib / stats.duration.Seconds()
}
manifestDuration := report.attempts[len(report.attempts)-1].duration
status := "SUCCESS"
if !stats.success {
status = "FAILURE"
}
gf.log("******************************************************")
gf.log("Status: %s", status)
gf.log("Started: %s", started.Format(time.RFC3339))
gf.log("Completed: %s", time.Now().Format(time.RFC3339))
gf.log("Requested workers: %6d", gf.WorkerCount)
gf.log("Actual workers: %6d", stats.workers)
gf.log("Total files: %6d", stats.files)
gf.log("Total retries: %6d", stats.retries)
if gf.TimeoutGCS {
gf.log("GCS timeouts: %6d", stats.gcsTimeouts)
}
gf.log("MiB downloaded: %9.2f MiB", mib)
gf.log("MiB/s throughput: %9.2f MiB/s", mibps)
gf.log("Time for manifest: %9.2f ms", float64(manifestDuration)/float64(time.Millisecond))
gf.log("Total time: %9.2f s", time.Since(started).Seconds())
gf.log("******************************************************")
if len(stats.errs) > 0 {
var es []string
es = append(es, fmt.Sprintf("Errors (%d):", len(stats.errs)))
for _, e := range stats.errs {
es = append(es, fmt.Sprintf(" - %s", e))
}
return fmt.Errorf(strings.Join(es, "\n"))
}
return nil
}
func (gf *Fetcher) copyFile(name string, mode os.FileMode, rc io.ReadCloser) (err error) {
defer func() {
if cerr := rc.Close(); cerr != nil {
err = fmt.Errorf("Failed to close file %q: %v", name, cerr)
}
}()
targetFile := filepath.Join(gf.DestDir, name)
if err := gf.OS.MkdirAll(filepath.Dir(targetFile), mode); err != nil {
return err
}
targetWriter, err := os.OpenFile(targetFile, os.O_WRONLY|os.O_CREATE, mode)
if err != nil {
return fmt.Errorf("failed to open target file %q: %v", targetFile, err)
}
defer func() {
if cerr := targetWriter.Close(); cerr != nil {
err = fmt.Errorf("Failed to close file %q: %v", targetFile, cerr)
}
}()
if _, err := io.Copy(targetWriter, rc); err != nil {
return fmt.Errorf("failed to copy %q to %q: %v", name, targetFile, err)
}
return nil
}
// fetchFromZip is used when downloading a single zip of source files. It is
// responsible to fetch the zip file and unzip it into the destination folder.
func (gf *Fetcher) fetchFromZip(ctx context.Context) (err error) {
started := time.Now()
gf.log("Fetching archive %s.", formatGCSName(gf.Bucket, gf.Object, gf.Generation))
// Download the archive from GCS.
zipDir := gf.StagingDir
j := job{
filename: gf.Object,
bucket: gf.Bucket,
object: gf.Object,
generation: gf.Generation,
destDirOverride: zipDir,
}
report := gf.fetchObject(ctx, j)
if !report.success {
if err, ok := report.err.(*permissionError); ok {
gf.logErr(err.Error())
os.Exit(permissionDeniedExitStatus)
}
return fmt.Errorf("failed to download archive %s: %v", formatGCSName(gf.Bucket, gf.Object, gf.Generation), report.err)
}
// Unzip into the destination directory
zipfile := filepath.Join(zipDir, gf.Object)
unzipStart := time.Now()
numFiles, err := unzip(zipfile, gf.DestDir)
if err != nil {
return err
}
unzipDuration := time.Since(unzipStart)
if !gf.KeepSource {
// Remove the zip file (best effort only, no harm if this fails).
if err := os.RemoveAll(zipfile); err != nil {
gf.log("Failed to remove zipfile %s, continuing: %v", zipfile, err)
}
// Final cleanup of staging directory, which is only a temporary staging
// location for downloading the zipfile in this case.
if err := gf.OS.RemoveAll(gf.StagingDir); err != nil {
gf.log("Failed to remove staging dir %q, continuing: %v", gf.StagingDir, err)
}
}
mib := float64(report.size) / 1024 / 1024
var mibps float64
zipfileDuration := report.attempts[len(report.attempts)-1].duration
if zipfileDuration > 0 {
mibps = mib / zipfileDuration.Seconds()
}
gf.log("******************************************************")
gf.log("Status: SUCCESS")
gf.log("Started: %s", started.Format(time.RFC3339))
gf.log("Completed: %s", time.Now().Format(time.RFC3339))
gf.log("Total files: %6d", numFiles)
gf.log("MiB downloaded: %9.2f MiB", mib)
gf.log("MiB/s throughput: %9.2f MiB/s", mibps)
gf.log("Time for zipfile: %9.2f s", zipfileDuration.Seconds())
gf.log("Time to unzip: %9.2f s", unzipDuration.Seconds())
gf.log("Total time: %9.2f s", time.Since(started).Seconds())
gf.log("******************************************************")
return nil
}
func unzip(zipfile, dest string) (numFiles int, err error) {
zipReader, err := zip.OpenReader(zipfile)
if err != nil {
return 0, fmt.Errorf("opening archive %s: %v", zipfile, err)
}
defer func() {
if cerr := zipReader.Close(); cerr != nil {
err = fmt.Errorf("closing archive %s: %v", zipfile, cerr)
}
}()
numFiles = 0
for _, file := range zipReader.File {
target := filepath.Join(dest, file.Name)
if file.FileInfo().IsDir() {
// Create directory with appropriate permissions if it doesn't exist.
if _, err := os.Stat(target); os.IsNotExist(err) {
if err := os.MkdirAll(target, file.Mode()); err != nil {
return 0, fmt.Errorf("making directory %s: %v", target, err)
}
continue
} else if err != nil {
return 0, fmt.Errorf("checking existence on %s: %v", target, err)
}
// If directory already exists, it may have been created below as a
// parent directory when processing a file. In this case, we must
// set the directory's permissions correctly.
if err := os.Chmod(target, file.Mode()); err != nil {
return 0, fmt.Errorf("setting permissions on %s: %v", target, err)
}
continue
}
// Create parent directories with full access. This only matters if the
// file comes from zipReader before the directory. In this case, the
// file permissions will be set to the correct value when the directory
// itself is processed above.
if err := os.MkdirAll(filepath.Dir(target), 0777); err != nil {
return 0, fmt.Errorf("making parent directories for %s: %v", target, err)
}
// Actually copy the bytes, using func to get early defer calls
// (important for large numbers of files).
numFiles++
reader, err := file.Open()
if err != nil {
return 0, fmt.Errorf("opening file in %s: %v", target, err)
}
if err := func() (ferr error) {
writer, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE, file.Mode())
if err != nil {
return fmt.Errorf("opening target file %s: %v", target, err)
}
defer func() {
if cerr := writer.Close(); cerr != nil {
ferr = fmt.Errorf("closing target file %s: %v", target, cerr)
}
}()
if _, err := io.Copy(writer, reader); err != nil {
return fmt.Errorf("copying %s to %s: %v", file.Name, target, err)
}
return nil
}(); err != nil {
return 0, err
}
}
return numFiles, nil
}
// fetchFromTarGz is used when downloading a single .tar.gz of source files. It
// is responsible to fetch the .tar.gz file and unzip it into the destination
// folder.
func (gf *Fetcher) fetchFromTarGz(ctx context.Context) (err error) {
started := time.Now()
gf.log("Fetching archive %s.", formatGCSName(gf.Bucket, gf.Object, gf.Generation))
// Download the archive from GCS.
tgzDir := gf.StagingDir
j := job{
filename: gf.Object,
bucket: gf.Bucket,
object: gf.Object,
generation: gf.Generation,
destDirOverride: tgzDir,
}
report := gf.fetchObject(ctx, j)
if !report.success {
if err, ok := report.err.(*permissionError); ok {
gf.logErr(err.Error())
os.Exit(permissionDeniedExitStatus)
}
return fmt.Errorf("failed to download archive %s: %v", formatGCSName(gf.Bucket, gf.Object, gf.Generation), report.err)
}
// Untgz into the destination directory
untgzStart := time.Now()
tgzfile := filepath.Join(tgzDir, gf.Object)
f, err := os.Open(tgzfile)
if err != nil {
return err
}
gzr, err := gzip.NewReader(f)
if err != nil {
return err
}
tr := tar.NewReader(gzr)
defer func() {
if cerr := f.Close(); cerr != nil {
err = fmt.Errorf("Failed to close file %q: %v", tgzfile, cerr)
}
}()
numFiles := 0
for {
h, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
n := filepath.Join(gf.DestDir, h.Name)
switch h.Typeflag {
case tar.TypeDir:
if err := gf.OS.MkdirAll(n, h.FileInfo().Mode()); err != nil {
return err
}
case tar.TypeReg:
if err := func() error {
f, err := os.OpenFile(n, os.O_WRONLY|os.O_CREATE, h.FileInfo().Mode())
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(f, tr)
return err
}(); err != nil {
return err
}
}
}
untgzDuration := time.Since(untgzStart)
if !gf.KeepSource {
// Remove the tgz file (best effort only, no harm if this fails).
if err := gf.OS.RemoveAll(tgzfile); err != nil {
gf.log("Failed to remove tgzfile %s, continuing: %v", tgzfile, err)
}
// Final cleanup of staging directory, which is only a temporary staging
// location for downloading the tgzfile in this case.
if err := gf.OS.RemoveAll(gf.StagingDir); err != nil {
gf.log("Failed to remove staging dir %q, continuing: %v", gf.StagingDir, err)
}
}
mib := float64(report.size) / 1024 / 1024
var mibps float64
tgzfileDuration := report.attempts[len(report.attempts)-1].duration
if tgzfileDuration > 0 {
mibps = mib / tgzfileDuration.Seconds()
}
gf.log("******************************************************")
gf.log("Status: SUCCESS")
gf.log("Started: %s", started.Format(time.RFC3339))
gf.log("Completed: %s", time.Now().Format(time.RFC3339))
gf.log("Total files: %6d", numFiles)
gf.log("MiB downloaded: %9.2f MiB", mib)
gf.log("MiB/s throughput: %9.2f MiB/s", mibps)
gf.log("Time for tgzfile: %9.2f s", tgzfileDuration.Seconds())
gf.log("Time to untgz: %9.2f s", untgzDuration.Seconds())
gf.log("Total time: %9.2f s", time.Since(started).Seconds())
gf.log("******************************************************")
return nil
}
// Fetch is the main entry point into Fetcher. Based on configuration,
// it pulls source from GCS into the destination directory.
func (gf *Fetcher) Fetch(ctx context.Context) error {
switch gf.SourceType {
case "Manifest":
return gf.fetchFromManifest(ctx)
case "Archive":
fmt.Println("WARNING: -type=Archive is deprecated; use -type=ZipArchive")
fallthrough
case "ZipArchive":
return gf.fetchFromZip(ctx)
case "TarGzArchive":
return gf.fetchFromTarGz(ctx)
default:
return fmt.Errorf("misconfigured GCSFetcher, unsupported -type %q", gf.SourceType)
}
return nil
}
func formatGCSName(bucket, object string, generation int64) string {
n := fmt.Sprintf("gs://%s/%s", bucket, object)
if generation > 0 {
n += fmt.Sprintf("#%d", generation)
}
return n
}