in azkustoingest/internal/queued/queued.go [168:238]
func (i *Ingestion) Reader(ctx context.Context, reader io.Reader, props properties.All) (string, error) {
containers, err := i.mgr.GetRankedStorageContainers()
if err != nil {
return "", err
}
if len(containers) == 0 {
return "", errors.ES(
errors.OpFileIngest,
errors.KBlobstore,
"no Blob Storage container resources are defined, there is no container to upload to",
).SetNoRetry()
}
queues, err := i.mgr.GetRankedStorageQueues()
if err != nil {
return "", err
}
// We want to check the queue size here so so we don't upload a file and then find we don't have a Kusto queue to stick
// it in. If we don't have a container, that is handled by containerQueue().
if len(queues) == 0 {
return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry()
}
compression := utils.CompressionDiscovery(props.Source.OriginalSource)
shouldCompress := ShouldCompress(&props, compression)
blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(props.Source.OriginalSource), compression, shouldCompress, props.Ingestion.Additional.Format.String())
size := int64(0)
if shouldCompress {
reader = gzip.Compress(reader)
}
// Go over all the containers and try to upload the file to each one. If we succeed, we are done.
for attempts, containerUri := range containers {
if attempts >= StorageMaxRetryPolicy {
return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry()
}
client, containerName, err := i.upstreamContainer(containerUri)
if err != nil {
i.mgr.ReportStorageResourceResult(containerUri.Account(), false)
continue
}
_, err = i.uploadStream(
ctx,
reader,
client,
containerName,
blobName,
&azblob.UploadStreamOptions{BlockSize: int64(i.bufferSize), Concurrency: i.maxBuffers},
)
if err != nil {
i.mgr.ReportStorageResourceResult(containerUri.Account(), false)
continue
}
i.mgr.ReportStorageResourceResult(containerUri.Account(), true)
if gz, ok := reader.(*gzip.Streamer); ok {
size = gz.InputSize()
}
err = i.Blob(ctx, fullUrl(client, containerName, blobName), size, props)
return blobName, err
}
return blobName, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage")
}