in gcs-fetcher/pkg/fetcher/fetcher.go [837:940]
func (gf *Fetcher) fetchFromTarGz(ctx context.Context) (err error) {
started := time.Now()
gf.log("Fetching archive %s.", formatGCSName(gf.Bucket, gf.Object, gf.Generation))
// Download the archive from GCS.
tgzDir := gf.StagingDir
j := job{
filename: gf.Object,
bucket: gf.Bucket,
object: gf.Object,
generation: gf.Generation,
destDirOverride: tgzDir,
}
report := gf.fetchObject(ctx, j)
if !report.success {
if err, ok := report.err.(*permissionError); ok {
gf.logErr(err.Error())
os.Exit(permissionDeniedExitStatus)
}
return fmt.Errorf("failed to download archive %s: %v", formatGCSName(gf.Bucket, gf.Object, gf.Generation), report.err)
}
// Untgz into the destination directory
untgzStart := time.Now()
tgzfile := filepath.Join(tgzDir, gf.Object)
f, err := os.Open(tgzfile)
if err != nil {
return err
}
gzr, err := gzip.NewReader(f)
if err != nil {
return err
}
tr := tar.NewReader(gzr)
defer func() {
if cerr := f.Close(); cerr != nil {
err = fmt.Errorf("Failed to close file %q: %v", tgzfile, cerr)
}
}()
numFiles := 0
for {
h, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
n := filepath.Join(gf.DestDir, h.Name)
switch h.Typeflag {
case tar.TypeDir:
if err := gf.OS.MkdirAll(n, h.FileInfo().Mode()); err != nil {
return err
}
case tar.TypeReg:
if err := func() error {
f, err := os.OpenFile(n, os.O_WRONLY|os.O_CREATE, h.FileInfo().Mode())
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(f, tr)
return err
}(); err != nil {
return err
}
}
}
untgzDuration := time.Since(untgzStart)
if !gf.KeepSource {
// Remove the tgz file (best effort only, no harm if this fails).
if err := gf.OS.RemoveAll(tgzfile); err != nil {
gf.log("Failed to remove tgzfile %s, continuing: %v", tgzfile, err)
}
// Final cleanup of staging directory, which is only a temporary staging
// location for downloading the tgzfile in this case.
if err := gf.OS.RemoveAll(gf.StagingDir); err != nil {
gf.log("Failed to remove staging dir %q, continuing: %v", gf.StagingDir, err)
}
}
mib := float64(report.size) / 1024 / 1024
var mibps float64
tgzfileDuration := report.attempts[len(report.attempts)-1].duration
if tgzfileDuration > 0 {
mibps = mib / tgzfileDuration.Seconds()
}
gf.log("******************************************************")
gf.log("Status: SUCCESS")
gf.log("Started: %s", started.Format(time.RFC3339))
gf.log("Completed: %s", time.Now().Format(time.RFC3339))
gf.log("Total files: %6d", numFiles)
gf.log("MiB downloaded: %9.2f MiB", mib)
gf.log("MiB/s throughput: %9.2f MiB/s", mibps)
gf.log("Time for tgzfile: %9.2f s", tgzfileDuration.Seconds())
gf.log("Time to untgz: %9.2f s", untgzDuration.Seconds())
gf.log("Total time: %9.2f s", time.Since(started).Seconds())
gf.log("******************************************************")
return nil
}