in sharedlibraries/storage/storage.go [256:357]
func (rw *ReadWriter) Upload(ctx context.Context) (int64, error) {
if rw.BucketHandle == nil {
return 0, errors.New("no bucket defined")
}
object := rw.BucketHandle.Object(rw.ObjectName).Retryer(rw.retryOptions("Failed to upload data to Google Cloud Storage, retrying.")...)
var writer io.WriteCloser
var err error
if rw.EncryptionKey != "" || rw.KMSKey != "" {
log.CtxLogger(ctx).Infow("Encryption enabled for upload", "bucket", rw.BucketName, "object", rw.ObjectName)
}
if rw.DumpData {
log.CtxLogger(ctx).Warnw("dump_data set to true, discarding data during upload", "bucket", rw.BucketName, "object", rw.ObjectName)
writer = discardCloser{}
} else if rw.XMLMultipartUpload {
log.CtxLogger(ctx).Infow("XML Multipart API enabled for upload", "bucket", rw.BucketName, "object", rw.ObjectName, "workers", rw.XMLMultipartWorkers)
writer, err = rw.NewMultipartWriter(ctx, defaultNewClient, google.DefaultTokenSource, google.CredentialsFromJSON)
if err != nil {
return 0, err
}
} else {
if rw.EncryptionKey != "" {
decodedKey, err := base64.StdEncoding.DecodeString(rw.EncryptionKey)
if err != nil {
return 0, err
}
object = object.Key(decodedKey)
}
objectWriter := object.NewWriter(ctx)
objectWriter.KMSKeyName = rw.KMSKey
objectWriter.ChunkSize = int(rw.ChunkSizeMb) * 1024 * 1024
objectWriter.Metadata = rw.Metadata
if !rw.CustomTime.IsZero() {
objectWriter.CustomTime = rw.CustomTime
log.CtxLogger(ctx).Infow("CustomTime set for upload", "bucket", rw.BucketName, "object", rw.ObjectName, "customTime", rw.CustomTime)
}
if rw.Compress {
objectWriter.ObjectAttrs.ContentType = compressedContentType
}
if rw.StorageClass != "" {
objectWriter.ObjectAttrs.StorageClass = rw.StorageClass
}
if rw.ObjectRetentionMode != "" {
objectWriter.ObjectAttrs.Retention = &storage.ObjectRetention{
Mode: rw.ObjectRetentionMode,
RetainUntil: rw.ObjectRetentionTime,
}
log.CtxLogger(ctx).Infow("ObjectRetention set for upload", "bucket", rw.BucketName, "object", rw.ObjectName, "retentionMode", rw.ObjectRetentionMode, "retentionTime", rw.ObjectRetentionTime)
}
// Set an individual chunk retry deadline to 10 minutes.
// Note, even with custom retries declared this value must also be set.
objectWriter.ChunkRetryDeadline = 10 * time.Minute
writer = objectWriter
}
rw.Writer = writer
rw = rw.defaultArgs()
if rw.ChunkSizeMb == 0 {
log.CtxLogger(ctx).Warn("ChunkSizeMb set to 0, uploads cannot be retried.")
}
log.CtxLogger(ctx).Infow("Upload starting", "bucket", rw.BucketName, "object", rw.ObjectName, "totalBytes", rw.TotalBytes)
var bytesWritten int64
if rw.Compress {
log.CtxLogger(ctx).Infow("Compression enabled for upload", "bucket", rw.BucketName, "object", rw.ObjectName)
gzipWriter := gzip.NewWriter(rw)
if bytesWritten, err = rw.Copier(gzipWriter, rw.Reader); err != nil {
return 0, err
}
// Closing the gzip writer flushes data to the underlying writer and writes the gzip footer.
// The underlying writer must be closed after this call to flush its buffer to the bucket.
if err := gzipWriter.Close(); err != nil {
return 0, err
}
} else {
if bytesWritten, err = rw.Copier(rw, rw.Reader); err != nil {
return 0, err
}
}
closeStart := time.Now()
if err := writer.Close(); err != nil {
return 0, err
}
rw.totalTransferTime += time.Since(closeStart)
// Verify object is in the bucket and bytesWritten matches the object's size in the bucket.
objectSize := int64(0)
if !rw.DumpData && rw.VerifyUpload {
attrs, err := rw.BucketHandle.Object(rw.ObjectName).Attrs(ctx)
if err != nil {
return bytesWritten, err
}
objectSize = attrs.Size
if bytesWritten != objectSize && !rw.Compress {
return bytesWritten, fmt.Errorf("upload error for object: %v, bytesWritten: %d does not equal the object's size: %d", rw.ObjectName, bytesWritten, objectSize)
}
}
avgTransferSpeedMBps := float64(rw.bytesTransferred) / rw.totalTransferTime.Seconds() / 1024 / 1024
log.CtxLogger(ctx).Infow("Upload success", "bucket", rw.BucketName, "object", rw.ObjectName, "bytesWritten", bytesWritten, "bytesTransferred", rw.bytesTransferred, "totalBytes", rw.TotalBytes, "objectSizeInBucket", objectSize, "percentComplete", 100, "avgTransferSpeedMBps", fmt.Sprintf("%g", math.Round(avgTransferSpeedMBps)))
return bytesWritten, nil
}