func()

in sharedlibraries/storage/storage.go [256:357]


func (rw *ReadWriter) Upload(ctx context.Context) (int64, error) {
	if rw.BucketHandle == nil {
		return 0, errors.New("no bucket defined")
	}

	object := rw.BucketHandle.Object(rw.ObjectName).Retryer(rw.retryOptions("Failed to upload data to Google Cloud Storage, retrying.")...)
	var writer io.WriteCloser
	var err error
	if rw.EncryptionKey != "" || rw.KMSKey != "" {
		log.CtxLogger(ctx).Infow("Encryption enabled for upload", "bucket", rw.BucketName, "object", rw.ObjectName)
	}
	if rw.DumpData {
		log.CtxLogger(ctx).Warnw("dump_data set to true, discarding data during upload", "bucket", rw.BucketName, "object", rw.ObjectName)
		writer = discardCloser{}
	} else if rw.XMLMultipartUpload {
		log.CtxLogger(ctx).Infow("XML Multipart API enabled for upload", "bucket", rw.BucketName, "object", rw.ObjectName, "workers", rw.XMLMultipartWorkers)
		writer, err = rw.NewMultipartWriter(ctx, defaultNewClient, google.DefaultTokenSource, google.CredentialsFromJSON)
		if err != nil {
			return 0, err
		}
	} else {
		if rw.EncryptionKey != "" {
			decodedKey, err := base64.StdEncoding.DecodeString(rw.EncryptionKey)
			if err != nil {
				return 0, err
			}
			object = object.Key(decodedKey)
		}
		objectWriter := object.NewWriter(ctx)
		objectWriter.KMSKeyName = rw.KMSKey
		objectWriter.ChunkSize = int(rw.ChunkSizeMb) * 1024 * 1024
		objectWriter.Metadata = rw.Metadata
		if !rw.CustomTime.IsZero() {
			objectWriter.CustomTime = rw.CustomTime
			log.CtxLogger(ctx).Infow("CustomTime set for upload", "bucket", rw.BucketName, "object", rw.ObjectName, "customTime", rw.CustomTime)
		}
		if rw.Compress {
			objectWriter.ObjectAttrs.ContentType = compressedContentType
		}
		if rw.StorageClass != "" {
			objectWriter.ObjectAttrs.StorageClass = rw.StorageClass
		}
		if rw.ObjectRetentionMode != "" {
			objectWriter.ObjectAttrs.Retention = &storage.ObjectRetention{
				Mode:        rw.ObjectRetentionMode,
				RetainUntil: rw.ObjectRetentionTime,
			}
			log.CtxLogger(ctx).Infow("ObjectRetention set for upload", "bucket", rw.BucketName, "object", rw.ObjectName, "retentionMode", rw.ObjectRetentionMode, "retentionTime", rw.ObjectRetentionTime)
		}
		// Set an individual chunk retry deadline to 10 minutes.
		// Note, even with custom retries declared this value must also be set.
		objectWriter.ChunkRetryDeadline = 10 * time.Minute
		writer = objectWriter
	}
	rw.Writer = writer

	rw = rw.defaultArgs()
	if rw.ChunkSizeMb == 0 {
		log.CtxLogger(ctx).Warn("ChunkSizeMb set to 0, uploads cannot be retried.")
	}

	log.CtxLogger(ctx).Infow("Upload starting", "bucket", rw.BucketName, "object", rw.ObjectName, "totalBytes", rw.TotalBytes)
	var bytesWritten int64
	if rw.Compress {
		log.CtxLogger(ctx).Infow("Compression enabled for upload", "bucket", rw.BucketName, "object", rw.ObjectName)
		gzipWriter := gzip.NewWriter(rw)
		if bytesWritten, err = rw.Copier(gzipWriter, rw.Reader); err != nil {
			return 0, err
		}
		// Closing the gzip writer flushes data to the underlying writer and writes the gzip footer.
		// The underlying writer must be closed after this call to flush its buffer to the bucket.
		if err := gzipWriter.Close(); err != nil {
			return 0, err
		}
	} else {
		if bytesWritten, err = rw.Copier(rw, rw.Reader); err != nil {
			return 0, err
		}
	}

	closeStart := time.Now()
	if err := writer.Close(); err != nil {
		return 0, err
	}
	rw.totalTransferTime += time.Since(closeStart)

	// Verify object is in the bucket and bytesWritten matches the object's size in the bucket.
	objectSize := int64(0)
	if !rw.DumpData && rw.VerifyUpload {
		attrs, err := rw.BucketHandle.Object(rw.ObjectName).Attrs(ctx)
		if err != nil {
			return bytesWritten, err
		}
		objectSize = attrs.Size
		if bytesWritten != objectSize && !rw.Compress {
			return bytesWritten, fmt.Errorf("upload error for object: %v, bytesWritten: %d does not equal the object's size: %d", rw.ObjectName, bytesWritten, objectSize)
		}
	}
	avgTransferSpeedMBps := float64(rw.bytesTransferred) / rw.totalTransferTime.Seconds() / 1024 / 1024
	log.CtxLogger(ctx).Infow("Upload success", "bucket", rw.BucketName, "object", rw.ObjectName, "bytesWritten", bytesWritten, "bytesTransferred", rw.bytesTransferred, "totalBytes", rw.TotalBytes, "objectSizeInBucket", objectSize, "percentComplete", 100, "avgTransferSpeedMBps", fmt.Sprintf("%g", math.Round(avgTransferSpeedMBps)))
	return bytesWritten, nil
}