azkustoingest/managed.go (173 lines of code) (raw):
package azkustoingest
import (
"bytes"
"context"
"fmt"
"github.com/Azure/azure-kusto-go/azkustodata"
"github.com/Azure/azure-kusto-go/azkustoingest/ingestoptions"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/queued"
"io"
"time"
"github.com/Azure/azure-kusto-go/azkustodata/errors"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/gzip"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/properties"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/utils"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
)
const (
mb = 1024 * 1024
maxStreamingSize = int64(4 * mb)
defaultInitialInterval = 1 * time.Second
defaultMultiplier = 2
retryCount = 2
)
type Managed struct {
queued *Ingestion
streaming *Streaming
}
// NewManaged is a constructor for Managed.
func NewManaged(kcsb *azkustodata.ConnectionStringBuilder, options ...Option) (*Managed, error) {
o := getOptions(options)
queuedKcsb := kcsb
if o.customIngestConnectionString != nil {
queuedKcsb = o.customIngestConnectionString
}
queued, err := New(queuedKcsb, options...)
if err != nil {
return nil, err
}
streaming, err := NewStreaming(kcsb, options...)
if err != nil {
return nil, err
}
return &Managed{
queued: queued,
streaming: streaming,
}, nil
}
func newManagedFromClients(queued *Ingestion, streaming *Streaming) *Managed {
return &Managed{
queued: queued,
streaming: streaming,
}
}
// Attempts to stream with retries, on success - return res,nil.
// If failed permanently - return err,nil.
// If failed transiently - return nil,nil.
func (m *Managed) streamWithRetries(ctx context.Context, payloadProvider func() io.Reader, props properties.All, isBlobUri bool) (*Result, error) {
var result *Result
hasCustomId := props.Streaming.ClientRequestId != ""
i := 0
managedUuid := uuid.New().String()
actualBackoff := backoff.WithContext(backoff.WithMaxRetries(props.ManagedStreaming.Backoff, retryCount), ctx)
var err error = nil
err = backoff.Retry(func() error {
if !hasCustomId {
props.Streaming.ClientRequestId = fmt.Sprintf("KGC.executeManagedStreamingIngest;%s;%d", managedUuid, i)
}
result, err = streamImpl(m.streaming.streamConn, ctx, payloadProvider(), props, isBlobUri)
i++
if err != nil {
if e, ok := err.(*errors.Error); ok {
if errors.Retry(e) {
return err
} else {
return backoff.Permanent(err)
}
} else {
return backoff.Permanent(err)
}
}
return nil
}, actualBackoff)
if err == nil {
return result, nil
}
if errors.Retry(err) {
// Caller should fallback to queued
return nil, nil
}
return nil, err
}
func (m *Managed) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error) {
props := m.newProp()
file, err, local := prepFileAndProps(fPath, &props, options, ManagedClient)
if err != nil {
return nil, err
}
if !local {
var size int64
var compressionTypeForEstimation ingestoptions.CompressionType
if size = props.Ingestion.RawDataSize; size == 0 {
size, err = utils.FetchBlobSize(fPath, ctx, m.queued.client.HttpClient())
if err != nil {
// Failed fetch blob properties
return nil, err
}
compressionTypeForEstimation = utils.CompressionDiscovery(fPath)
props.Ingestion.RawDataSize = utils.EstimateRawDataSize(compressionTypeForEstimation, size)
} else {
// If user sets raw data size we always want to devide it for estimation
compressionTypeForEstimation = ingestoptions.CTNone
}
// File is not compressed and user says its compressed, raw 10 mb -> do
if !shouldUseQueuedIngestBySize(compressionTypeForEstimation, size) {
res, err := m.streamWithRetries(ctx, func() io.Reader { return generateBlobUriPayloadReader(fPath) }, props, true)
if err != nil || res != nil {
return res, err
}
}
return m.queued.fromFile(ctx, fPath, []FileOption{}, props)
}
// No need to get local file size as we later use the compressed stream size
return m.managedStreamImpl(ctx, file, props)
}
func shouldUseQueuedIngestBySize(compression ingestoptions.CompressionType, fileSize int64) bool {
switch compression {
case ingestoptions.GZIP, ingestoptions.ZIP:
return fileSize > maxStreamingSize
}
return fileSize/utils.EstimatedCompressionFactor > maxStreamingSize
}
func (m *Managed) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error) {
props := m.newProp()
for _, prop := range options {
err := prop.Run(&props, ManagedClient, FromReader)
if err != nil {
return nil, err
}
}
return m.managedStreamImpl(ctx, io.NopCloser(reader), props)
}
func (m *Managed) managedStreamImpl(ctx context.Context, payload io.ReadCloser, props properties.All) (*Result, error) {
defer payload.Close()
compress := queued.ShouldCompress(&props, ingestoptions.CTUnknown)
var compressed io.Reader = payload
if compress {
compressed = gzip.Compress(io.NopCloser(payload))
props.Source.DontCompress = true
}
maxSize := maxStreamingSize
buf, err := io.ReadAll(io.LimitReader(compressed, int64(maxSize+1)))
if err != nil {
return nil, err
}
if shouldUseQueuedIngestBySize(ingestoptions.GZIP, int64(len(buf))) {
combinedBuf := io.MultiReader(bytes.NewReader(buf), compressed)
return m.queued.fromReader(ctx, combinedBuf, []FileOption{}, props)
}
res, err := m.streamWithRetries(ctx, func() io.Reader { return bytes.NewReader(buf) }, props, false)
if err != nil || res != nil {
return res, err
}
// Theres no size estimation when ingesting from stream. If we did not already use queued ingestion
// we can assume all the original payload reader is < 4mb, therefore no need to combine
return m.queued.fromReader(ctx, bytes.NewReader(buf), []FileOption{}, props)
}
func (m *Managed) newProp() properties.All {
exp := backoff.NewExponentialBackOff()
exp.InitialInterval = defaultInitialInterval
exp.Multiplier = defaultMultiplier
return properties.All{
Ingestion: properties.Ingestion{
DatabaseName: m.streaming.db,
TableName: m.streaming.table,
},
ManagedStreaming: properties.ManagedStreaming{
Backoff: exp,
},
}
}
func (m *Managed) Close() error {
return errors.CombineErrors(m.queued.Close(), m.streaming.Close())
}