in azkustoingest/internal/queued/queued.go [347:409]
func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob.Client, container string, props *properties.All) (string, int64, error) {
compression := utils.CompressionDiscovery(from)
shouldCompress := ShouldCompress(props, compression)
blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from), compression, shouldCompress, props.Ingestion.Additional.Format.String())
file, err := os.Open(from)
if err != nil {
return "", 0, errors.ES(
errors.OpFileIngest,
errors.KLocalFileSystem,
"problem retrieving source file %q: %s", from, err,
).SetNoRetry()
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return "", 0, errors.ES(
errors.OpFileIngest,
errors.KLocalFileSystem,
"could not Stat the file(%s): %s", from, err,
).SetNoRetry()
}
if shouldCompress {
gstream := gzip.New()
gstream.Reset(file)
_, err = i.uploadStream(
ctx,
gstream,
client,
container,
blobName,
&azblob.UploadStreamOptions{BlockSize: int64(i.bufferSize), Concurrency: i.maxBuffers},
)
if err != nil {
return "", 0, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage: %s", err)
}
return fullUrl(client, container, blobName), gstream.InputSize(), nil
}
// The high-level API UploadFileToBlockBlob function uploads blocks in parallel for optimal performance, and can handle large files as well.
// This function calls StageBlock/CommitBlockList for files larger 256 MBs, and calls Upload for any file smaller
_, err = i.uploadBlob(
ctx,
file,
client,
container,
blobName,
&azblob.UploadFileOptions{
BlockSize: BlockSize,
Concurrency: Concurrency,
},
)
if err != nil {
return "", 0, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage: %s", err)
}
return fullUrl(client, container, blobName), stat.Size(), nil
}