func()

in azkustoingest/internal/queued/queued.go [111:164]


func (i *Ingestion) Local(ctx context.Context, from string, props properties.All) error {
	containers, err := i.mgr.GetRankedStorageContainers()
	if err != nil {
		return err
	}

	if len(containers) == 0 {
		return errors.ES(
			errors.OpFileIngest,
			errors.KBlobstore,
			"no Blob Storage container resources are defined, there is no container to upload to",
		).SetNoRetry()
	}

	queues, err := i.mgr.GetRankedStorageQueues()
	if err != nil {
		return err
	}

	// We want to check the queue size here so we don't upload a file and then find we don't have a Kusto queue to stick
	// it in. If we don't have a container, that is handled by containerQueue().
	if len(queues) == 0 {
		return errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry()
	}

	// Go over all the containers and try to upload the file to each one. If we succeed, we are done.
	for attempts, containerUri := range containers {
		if attempts >= StorageMaxRetryPolicy {
			return errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry()
		}

		client, containerName, err := i.upstreamContainer(containerUri)
		if err != nil {
			i.mgr.ReportStorageResourceResult(containerUri.Account(), false)
			continue
		}

		blobURL, size, err := i.localToBlob(ctx, from, client, containerName, &props)
		if err == nil {
			i.mgr.ReportStorageResourceResult(containerUri.Account(), true)
			return i.Blob(ctx, blobURL, size, props)
		}

		// check if the error is retryable
		if errors.Retry(err) {
			i.mgr.ReportStorageResourceResult(containerUri.Account(), false)
			continue
		} else {
			return err
		}
	}

	return errors.ES(errors.OpFileIngest, errors.KBlobstore, "could not upload file to any container")
}