func()

in gcs-fetcher/pkg/fetcher/fetcher.go [837:940]


func (gf *Fetcher) fetchFromTarGz(ctx context.Context) (err error) {
	started := time.Now()
	gf.log("Fetching archive %s.", formatGCSName(gf.Bucket, gf.Object, gf.Generation))

	// Download the archive from GCS.
	tgzDir := gf.StagingDir
	j := job{
		filename:        gf.Object,
		bucket:          gf.Bucket,
		object:          gf.Object,
		generation:      gf.Generation,
		destDirOverride: tgzDir,
	}
	report := gf.fetchObject(ctx, j)
	if !report.success {
		if err, ok := report.err.(*permissionError); ok {
			gf.logErr(err.Error())
			os.Exit(permissionDeniedExitStatus)
		}
		return fmt.Errorf("failed to download archive %s: %v", formatGCSName(gf.Bucket, gf.Object, gf.Generation), report.err)
	}

	// Untgz into the destination directory
	untgzStart := time.Now()
	tgzfile := filepath.Join(tgzDir, gf.Object)
	f, err := os.Open(tgzfile)
	if err != nil {
		return err
	}
	gzr, err := gzip.NewReader(f)
	if err != nil {
		return err
	}
	tr := tar.NewReader(gzr)

	defer func() {
		if cerr := f.Close(); cerr != nil {
			err = fmt.Errorf("Failed to close file %q: %v", tgzfile, cerr)
		}
	}()

	numFiles := 0
	for {
		h, err := tr.Next()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		n := filepath.Join(gf.DestDir, h.Name)
		switch h.Typeflag {
		case tar.TypeDir:
			if err := gf.OS.MkdirAll(n, h.FileInfo().Mode()); err != nil {
				return err
			}
		case tar.TypeReg:
			if err := func() error {
				f, err := os.OpenFile(n, os.O_WRONLY|os.O_CREATE, h.FileInfo().Mode())
				if err != nil {
					return err
				}
				defer f.Close()
				_, err = io.Copy(f, tr)
				return err
			}(); err != nil {
				return err
			}
		}
	}
	untgzDuration := time.Since(untgzStart)

	if !gf.KeepSource {
		// Remove the tgz file (best effort only, no harm if this fails).
		if err := gf.OS.RemoveAll(tgzfile); err != nil {
			gf.log("Failed to remove tgzfile %s, continuing: %v", tgzfile, err)
		}

		// Final cleanup of staging directory, which is only a temporary staging
		// location for downloading the tgzfile in this case.
		if err := gf.OS.RemoveAll(gf.StagingDir); err != nil {
			gf.log("Failed to remove staging dir %q, continuing: %v", gf.StagingDir, err)
		}
	}

	mib := float64(report.size) / 1024 / 1024
	var mibps float64
	tgzfileDuration := report.attempts[len(report.attempts)-1].duration
	if tgzfileDuration > 0 {
		mibps = mib / tgzfileDuration.Seconds()
	}
	gf.log("******************************************************")
	gf.log("Status:                      SUCCESS")
	gf.log("Started:                     %s", started.Format(time.RFC3339))
	gf.log("Completed:                   %s", time.Now().Format(time.RFC3339))
	gf.log("Total files:       %6d", numFiles)
	gf.log("MiB downloaded:    %9.2f MiB", mib)
	gf.log("MiB/s throughput:  %9.2f MiB/s", mibps)
	gf.log("Time for tgzfile:  %9.2f s", tgzfileDuration.Seconds())
	gf.log("Time to untgz:     %9.2f s", untgzDuration.Seconds())
	gf.log("Total time:        %9.2f s", time.Since(started).Seconds())
	gf.log("******************************************************")
	return nil
}