in azkustoingest/internal/queued/queued.go [241:292]
func (i *Ingestion) Blob(ctx context.Context, from string, fileSize int64, props properties.All) error {
// To learn more about ingestion properties, go to:
// https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/#ingestion-properties
// To learn more about ingestion methods go to:
// https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
props.Ingestion.BlobPath = from
if fileSize != 0 {
props.Ingestion.RawDataSize = fileSize
}
props.Ingestion.RetainBlobOnSuccess = !props.Source.DeleteLocalSource
props.Ingestion.ApplicationForTracing = i.applicationForTracing
props.Ingestion.ClientVersionForTracing = i.clientVersionForTracing
err := CompleteFormatFromFileName(&props, from)
if err != nil {
return err
}
j, err := props.Ingestion.MarshalJSONString()
if err != nil {
return errors.ES(errors.OpFileIngest, errors.KInternal, "could not marshal the ingestion blob info: %s", err).SetNoRetry()
}
queueResources, err := i.mgr.GetRankedStorageQueues()
if err != nil {
return err
}
// Go over all the queues and try to upload the file to each one. If we succeed, we are done.
for attempts, queueUri := range queueResources {
if attempts >= StorageMaxRetryPolicy {
return errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry()
}
queue, err := i.upstreamQueue(queueUri)
if err != nil {
i.mgr.ReportStorageResourceResult(queueUri.Account(), false)
continue
}
if _, err := queue.EnqueueMessage(ctx, j, nil); err != nil {
i.mgr.ReportStorageResourceResult(queueUri.Account(), false)
continue
} else {
i.mgr.ReportStorageResourceResult(queueUri.Account(), true)
return props.ApplyDeleteLocalSourceOption()
}
}
return errors.ES(errors.OpFileIngest, errors.KBlobstore, "could not upload file to any queue")
}