in sharedlibraries/storage/multipart.go [119:165]
func (rw *ReadWriter) NewMultipartWriter(ctx context.Context, newClient HTTPClient, tokenGetter DefaultTokenGetter, jsonCredentialsGetter JSONCredentialsGetter) (*MultipartWriter, error) {
if rw.XMLMultipartEndpoint == "" {
rw.XMLMultipartEndpoint = defaultClientEndpoint
}
baseURL := fmt.Sprintf("https://%s.%s/%s", rw.BucketName, rw.XMLMultipartEndpoint, rw.ObjectName)
token, err := token(ctx, rw.XMLMultipartServiceAccount, tokenGetter, jsonCredentialsGetter)
if err != nil {
return nil, fmt.Errorf("failed to fetch auth token, err: %w", err)
}
w := &MultipartWriter{
bucket: rw.BucketHandle,
objectName: rw.ObjectName,
fileType: rw.Metadata["X-Backup-Type"],
token: token,
httpClient: newClient(10*time.Minute, defaultTransport()),
baseURL: baseURL,
storageClass: rw.StorageClass,
partSizeBytes: rw.ChunkSizeMb * 1024 * 1024,
partNum: 1,
maxRetries: rw.MaxRetries,
retryBackoffInitial: rw.RetryBackoffInitial,
retryBackoffMax: rw.RetryBackoffMax,
retryBackoffMultiplier: rw.RetryBackoffMultiplier,
mu: &sync.Mutex{},
parts: make(map[int64]objectPart),
workers: make([]*uploadWorker, rw.XMLMultipartWorkers),
idleWorkers: make(chan *uploadWorker, rw.XMLMultipartWorkers),
customTime: rw.CustomTime,
retentionMode: rw.ObjectRetentionMode,
retentionTime: rw.ObjectRetentionTime,
}
if w.uploadID, err = w.initMultipartUpload(); err != nil {
return nil, fmt.Errorf("failed to init multipart upload, err: %w", err)
}
// Each worker needs a dedicated transport to prevent throttling.
for i := 0; i < int(rw.XMLMultipartWorkers); i++ {
w.workers[i] = &uploadWorker{
w: w,
httpClient: newClient(10*time.Minute, defaultTransport()),
buffer: make([]byte, w.partSizeBytes),
}
w.idleWorkers <- w.workers[i]
}
return w, nil
}