func()

in azkustoingest/internal/queued/queued.go [241:292]


func (i *Ingestion) Blob(ctx context.Context, from string, fileSize int64, props properties.All) error {
	// To learn more about ingestion properties, go to:
	// https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/#ingestion-properties
	// To learn more about ingestion methods go to:
	// https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods

	props.Ingestion.BlobPath = from
	if fileSize != 0 {
		props.Ingestion.RawDataSize = fileSize
	}

	props.Ingestion.RetainBlobOnSuccess = !props.Source.DeleteLocalSource
	props.Ingestion.ApplicationForTracing = i.applicationForTracing
	props.Ingestion.ClientVersionForTracing = i.clientVersionForTracing

	err := CompleteFormatFromFileName(&props, from)
	if err != nil {
		return err
	}

	j, err := props.Ingestion.MarshalJSONString()
	if err != nil {
		return errors.ES(errors.OpFileIngest, errors.KInternal, "could not marshal the ingestion blob info: %s", err).SetNoRetry()
	}

	queueResources, err := i.mgr.GetRankedStorageQueues()
	if err != nil {
		return err
	}

	// Go over all the queues and try to upload the file to each one. If we succeed, we are done.
	for attempts, queueUri := range queueResources {
		if attempts >= StorageMaxRetryPolicy {
			return errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry()
		}
		queue, err := i.upstreamQueue(queueUri)
		if err != nil {
			i.mgr.ReportStorageResourceResult(queueUri.Account(), false)
			continue
		}

		if _, err := queue.EnqueueMessage(ctx, j, nil); err != nil {
			i.mgr.ReportStorageResourceResult(queueUri.Account(), false)
			continue
		} else {
			i.mgr.ReportStorageResourceResult(queueUri.Account(), true)
			return props.ApplyDeleteLocalSourceOption()
		}
	}

	return errors.ES(errors.OpFileIngest, errors.KBlobstore, "could not upload file to any queue")
}