commands/helpers/cache_archiver.go (237 lines of code) (raw):

package helpers import ( "context" "fmt" "io" "net/http" "net/url" "os" "path/filepath" "strings" "time" "github.com/sirupsen/logrus" "github.com/urfave/cli" "mvdan.cc/sh/v3/shell" "gitlab.com/gitlab-org/gitlab-runner/commands/helpers/archive" "gitlab.com/gitlab-org/gitlab-runner/commands/helpers/meter" "gitlab.com/gitlab-org/gitlab-runner/common" url_helpers "gitlab.com/gitlab-org/gitlab-runner/helpers/url" "gitlab.com/gitlab-org/gitlab-runner/log" "gocloud.dev/blob" _ "gocloud.dev/blob/azureblob" // Needed to register the Azure driver _ "gocloud.dev/blob/s3blob" // Needed to register the AWS S3 driver ) type CacheArchiverCommand struct { fileArchiver retryHelper meter.TransferMeterCommand File string `long:"file" description:"The path to file"` URL string `long:"url" description:"URL of remote cache resource (pre-signed URL)"` GoCloudURL string `long:"gocloud-url" description:"Go Cloud URL of remote cache resource (requires credentials)"` Timeout int `long:"timeout" description:"Overall timeout for cache uploading request (in minutes)"` Headers []string `long:"header" description:"HTTP headers to send with PUT request (in form of 'key:value')"` CompressionLevel string `long:"compression-level" env:"CACHE_COMPRESSION_LEVEL" description:"Compression level (fastest, fast, default, slow, slowest)"` CompressionFormat string `long:"compression-format" env:"CACHE_COMPRESSION_FORMAT" description:"Compression format (zip, tarzstd)"` MaxUploadedArchiveSize int64 `long:"max-uploaded-archive-size" env:"CACHE_MAX_UPLOADED_ARCHIVE_SIZE" description:"Limit the size of the cache archive being uploaded to cloud storage, in bytes."` EnvFile string `long:"env-file" description:"Filename containing environment variables to read"` client *CacheClient mux *blob.URLMux } func (c *CacheArchiverCommand) getClient() *CacheClient { if c.client == nil { c.client = NewCacheClient(c.Timeout) } return c.client } func (c *CacheArchiverCommand) upload(_ int) error { file, err := os.Open(c.File) if err != nil { return err } defer func() { _ = file.Close() }() fi, err := file.Stat() if err != nil { return err } rc := meter.NewReader( file, c.TransferMeterFrequency, meter.LabelledRateFormat(os.Stdout, "Uploading cache", fi.Size()), ) defer rc.Close() if c.GoCloudURL != "" { return c.handleGoCloudURL(rc) } return c.handlePresignedURL(fi, rc) } func (c *CacheArchiverCommand) handlePresignedURL(fi os.FileInfo, file io.Reader) error { logrus.Infoln("Uploading", filepath.Base(c.File), "to", url_helpers.CleanURL(c.URL)) req, err := http.NewRequest(http.MethodPut, c.URL, file) if err != nil { return retryableErr{err: err} } c.setHeaders(req, fi) req.ContentLength = fi.Size() resp, err := c.getClient().Do(req) if err != nil { return retryableErr{err: err} } defer func() { _ = resp.Body.Close() }() return retryOnServerError(resp) } func (c *CacheArchiverCommand) handleGoCloudURL(file io.Reader) error { logrus.Infoln("Uploading", filepath.Base(c.File), "to", url_helpers.CleanURL(c.GoCloudURL)) if c.mux == nil { c.mux = blob.DefaultURLMux() } ctx, cancelWrite := context.WithCancel(context.Background()) defer cancelWrite() u, err := url.Parse(c.GoCloudURL) if err != nil { return err } err = loadEnvFile(c.EnvFile) if err != nil { return err } objectName := strings.TrimLeft(u.Path, "/") if objectName == "" { return fmt.Errorf("no object name provided") } b, err := c.mux.OpenBucket(ctx, c.GoCloudURL) if err != nil { return err } defer b.Close() writer, err := b.NewWriter(ctx, objectName, nil) if err != nil { return err } if _, err = io.Copy(writer, file); err != nil { cancelWrite() if writerErr := writer.Close(); writerErr != nil { logrus.WithError(writerErr).Error("error closing Go cloud upload after copy failure") } return err } if err := writer.Close(); err != nil { return err } return nil } func (c *CacheArchiverCommand) createZipFile(filename string) (int64, error) { err := os.MkdirAll(filepath.Dir(filename), 0o700) if err != nil { return 0, err } f, err := os.CreateTemp(filepath.Dir(filename), "archive_") if err != nil { return 0, err } defer os.Remove(f.Name()) defer f.Close() logrus.Debugln("Temporary file:", f.Name()) switch strings.ToLower(c.CompressionFormat) { case string(common.ArtifactFormatTarZstd): c.CompressionFormat = string(common.ArtifactFormatTarZstd) default: c.CompressionFormat = string(common.ArtifactFormatZip) } archiver, err := archive.NewArchiver(archive.Format(c.CompressionFormat), f, c.wd, GetCompressionLevel(c.CompressionLevel)) if err != nil { return 0, err } // Create archive err = archiver.Archive(context.Background(), c.files) if err != nil { return 0, err } info, err := f.Stat() if err != nil { return 0, err } err = f.Close() if err != nil { return 0, err } return info.Size(), os.Rename(f.Name(), filename) } func (c *CacheArchiverCommand) Execute(*cli.Context) { log.SetRunnerFormatter() c.normalizeArgs() // Enumerate files err := c.enumerate() if err != nil { logrus.Fatalln(err) } // Check if list of files changed if !c.isFileChanged(c.File) { logrus.Infoln("Archive is up to date!") return } // Create archive size, err := c.createZipFile(c.File) if err != nil { logrus.Fatalln(err) } c.uploadArchiveIfNeeded(size) } func (c *CacheArchiverCommand) normalizeArgs() { if c.File == "" { logrus.Fatalln("Missing --file") } for idx := range c.Paths { if path, err := shell.Expand(c.Paths[idx], nil); err != nil { logrus.Warnf("invalid path %q: %v", path, err) } else { c.Paths[idx] = path } } for idx := range c.Exclude { if path, err := shell.Expand(c.Exclude[idx], nil); err != nil { logrus.Warnf("invalid path %q: %v", path, err) } else { c.Exclude[idx] = path } } } func (c *CacheArchiverCommand) uploadArchiveIfNeeded(size int64) { if c.URL == "" && c.GoCloudURL == "" { logrus.Infoln( "No URL provided, cache will not be uploaded to shared cache server. " + "Cache will be stored only locally.") return } if c.MaxUploadedArchiveSize != 0 && size > c.MaxUploadedArchiveSize { logrus.Infoln(fmt.Sprintf("Cache archive size (%d) is too big (Limit is set to %d). "+ "Cache will be stored only locally.", size, c.MaxUploadedArchiveSize)) return } err := c.doRetry(c.upload) if err != nil { logrus.Fatalln(err) } } func (c *CacheArchiverCommand) setHeaders(req *http.Request, fi os.FileInfo) { if len(c.Headers) > 0 { for _, header := range c.Headers { parsed := strings.SplitN(header, ":", 2) if len(parsed) != 2 { continue } req.Header.Set(strings.TrimSpace(parsed[0]), strings.TrimSpace(parsed[1])) } } // Set default headers. But don't override custom Content-Type. if req.Header.Get(common.ContentType) == "" { req.Header.Set(common.ContentType, "application/octet-stream") } req.Header.Set("Last-Modified", fi.ModTime().UTC().Format(http.TimeFormat)) } func init() { common.RegisterCommand2( "cache-archiver", "create and upload cache artifacts (internal)", &CacheArchiverCommand{ retryHelper: retryHelper{ Retry: 2, RetryTime: time.Second, }, }, ) }