func()

in azkustoingest/internal/queued/queued.go [347:409]


func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob.Client, container string, props *properties.All) (string, int64, error) {
	compression := utils.CompressionDiscovery(from)
	shouldCompress := ShouldCompress(props, compression)
	blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from), compression, shouldCompress, props.Ingestion.Additional.Format.String())

	file, err := os.Open(from)
	if err != nil {
		return "", 0, errors.ES(
			errors.OpFileIngest,
			errors.KLocalFileSystem,
			"problem retrieving source file %q: %s", from, err,
		).SetNoRetry()
	}

	defer file.Close()
	stat, err := file.Stat()
	if err != nil {
		return "", 0, errors.ES(
			errors.OpFileIngest,
			errors.KLocalFileSystem,
			"could not Stat the file(%s): %s", from, err,
		).SetNoRetry()
	}

	if shouldCompress {
		gstream := gzip.New()
		gstream.Reset(file)

		_, err = i.uploadStream(
			ctx,
			gstream,
			client,
			container,
			blobName,
			&azblob.UploadStreamOptions{BlockSize: int64(i.bufferSize), Concurrency: i.maxBuffers},
		)

		if err != nil {
			return "", 0, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage: %s", err)
		}
		return fullUrl(client, container, blobName), gstream.InputSize(), nil
	}

	// The high-level API UploadFileToBlockBlob function uploads blocks in parallel for optimal performance, and can handle large files as well.
	// This function calls StageBlock/CommitBlockList for files larger 256 MBs, and calls Upload for any file smaller
	_, err = i.uploadBlob(
		ctx,
		file,
		client,
		container,
		blobName,
		&azblob.UploadFileOptions{
			BlockSize:   BlockSize,
			Concurrency: Concurrency,
		},
	)

	if err != nil {
		return "", 0, errors.ES(errors.OpFileIngest, errors.KBlobstore, "problem uploading to Blob Storage: %s", err)
	}

	return fullUrl(client, container, blobName), stat.Size(), nil
}