in azkustoingest/internal/queued/queued.go [111:164]
func (i *Ingestion) Local(ctx context.Context, from string, props properties.All) error {
containers, err := i.mgr.GetRankedStorageContainers()
if err != nil {
return err
}
if len(containers) == 0 {
return errors.ES(
errors.OpFileIngest,
errors.KBlobstore,
"no Blob Storage container resources are defined, there is no container to upload to",
).SetNoRetry()
}
queues, err := i.mgr.GetRankedStorageQueues()
if err != nil {
return err
}
// We want to check the queue size here so we don't upload a file and then find we don't have a Kusto queue to stick
// it in. If we don't have a container, that is handled by containerQueue().
if len(queues) == 0 {
return errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry()
}
// Go over all the containers and try to upload the file to each one. If we succeed, we are done.
for attempts, containerUri := range containers {
if attempts >= StorageMaxRetryPolicy {
return errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry()
}
client, containerName, err := i.upstreamContainer(containerUri)
if err != nil {
i.mgr.ReportStorageResourceResult(containerUri.Account(), false)
continue
}
blobURL, size, err := i.localToBlob(ctx, from, client, containerName, &props)
if err == nil {
i.mgr.ReportStorageResourceResult(containerUri.Account(), true)
return i.Blob(ctx, blobURL, size, props)
}
// check if the error is retryable
if errors.Retry(err) {
i.mgr.ReportStorageResourceResult(containerUri.Account(), false)
continue
} else {
return err
}
}
return errors.ES(errors.OpFileIngest, errors.KBlobstore, "could not upload file to any container")
}