func()

in azkustoingest/internal/queued/queued.go [168:238]


func (i *Ingestion) Reader(ctx context.Context, reader io.Reader, props properties.All) (string, error) {
	containers, err := i.mgr.GetRankedStorageContainers()
	if err != nil {
		return "", err
	}

	if len(containers) == 0 {
		return "", errors.ES(
			errors.OpFileIngest,
			errors.KBlobstore,
			"no Blob Storage container resources are defined, there is no container to upload to",
		).SetNoRetry()
	}

	queues, err := i.mgr.GetRankedStorageQueues()
	if err != nil {
		return "", err
	}

	// We want to check the queue size here so so we don't upload a file and then find we don't have a Kusto queue to stick
	// it in. If we don't have a container, that is handled by containerQueue().
	if len(queues) == 0 {
		return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry()
	}

	compression := utils.CompressionDiscovery(props.Source.OriginalSource)
	shouldCompress := ShouldCompress(&props, compression)
	blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(props.Source.OriginalSource), compression, shouldCompress, props.Ingestion.Additional.Format.String())

	size := int64(0)

	if shouldCompress {
		reader = gzip.Compress(reader)
	}

	// Go over all the containers and try to upload the file to each one. If we succeed, we are done.
	for attempts, containerUri := range containers {
		if attempts >= StorageMaxRetryPolicy {
			return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "max retry policy reached").SetNoRetry()
		}

		client, containerName, err := i.upstreamContainer(containerUri)
		if err != nil {
			i.mgr.ReportStorageResourceResult(containerUri.Account(), false)
			continue
		}

		_, err = i.uploadStream(
			ctx,
			reader,
			client,
			containerName,
			blobName,
			&azblob.UploadStreamOptions{BlockSize: int64(i.bufferSize), Concurrency: i.maxBuffers},
		)

		if err != nil {
			i.mgr.ReportStorageResourceResult(containerUri.Account(), false)
			continue
		}

		i.mgr.ReportStorageResourceResult(containerUri.Account(), true)
		if gz, ok := reader.(*gzip.Streamer); ok {
			size = gz.InputSize()
		}
		err = i.Blob(ctx, fullUrl(client, containerName, blobName), size, props)
		return blobName, err
	}

	return blobName, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage")
}