azkustoingest/streaming.go (156 lines of code) (raw):
package azkustoingest
import (
"bytes"
"context"
"encoding/json"
"github.com/Azure/azure-kusto-go/azkustoingest/ingestoptions"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/utils"
"io"
"os"
"github.com/Azure/azure-kusto-go/azkustodata"
"github.com/Azure/azure-kusto-go/azkustodata/errors"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/gzip"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/properties"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/queued"
"github.com/google/uuid"
)
type streamIngestor interface {
io.Closer
StreamIngest(ctx context.Context, db, table string, payload io.Reader, format azkustodata.DataFormatForStreaming, mappingName string, clientRequestId string, isBlobUri bool) error
}
// Streaming provides data ingestion from external sources into Kusto.
type Streaming struct {
db string
table string
client QueryClient
streamConn streamIngestor
}
type blobUri struct {
SourceUri string `json:"sourceUri"`
}
// NewStreaming is the constructor for Streaming.
// More information can be found here:
// https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command
func NewStreaming(kcsb *azkustodata.ConnectionStringBuilder, options ...Option) (*Streaming, error) {
o := getOptions(options)
if !o.withoutEndpointCorrection {
newKcsb := *kcsb
newKcsb.DataSource = removeIngestPrefix(newKcsb.DataSource)
kcsb = &newKcsb
}
client, err := azkustodata.New(kcsb)
if err != nil {
return nil, err
}
return newStreamingFromClient(client, o)
}
func newStreamingFromClient(client QueryClient, o *Ingestion) (*Streaming, error) {
streamConn, err := azkustodata.NewConn(removeIngestPrefix(client.Endpoint()), client.Auth(), client.HttpClient(), client.ClientDetails())
if err != nil {
client.Close()
return nil, err
}
i := &Streaming{
db: o.db,
table: o.table,
client: client,
streamConn: streamConn,
}
return i, nil
}
// FromFile allows uploading a data file for Kusto from either a local path or a blobstore URI path.
// This method is thread-safe.
func (i *Streaming) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error) {
props := i.newProp()
file, err, local := prepFileAndProps(fPath, &props, options, StreamingClient)
if err != nil {
return nil, err
}
if !local {
return streamImpl(i.streamConn, ctx, generateBlobUriPayloadReader(fPath), props, true)
}
defer file.Close()
return streamImpl(i.streamConn, ctx, file, props, false)
}
// Returns the opened file, err, boolean indicator if its a local file
func prepFileAndProps(fPath string, props *properties.All, options []FileOption, client ClientScope) (*os.File, error, bool) {
var err error
for _, option := range options {
err := option.Run(props, client, FromFile)
if err != nil {
return nil, err, true
}
}
local, err := queued.IsLocalPath(fPath)
if err != nil {
return nil, err, local
}
if !local {
return nil, nil, false
}
props.Source.OriginalSource = fPath
compression := utils.CompressionDiscovery(fPath)
err = queued.CompleteFormatFromFileName(props, fPath)
if err != nil {
return nil, err, true
}
props.Source.DontCompress = !queued.ShouldCompress(props, compression)
file, err := os.Open(fPath)
if err != nil {
return nil, err, true
}
return file, nil, true
}
// FromReader allows uploading a data file for Kusto from an io.Reader. The content is uploaded to Blobstore and
// ingested after all data in the reader is processed. Content should not use compression as the content will be
// compressed with gzip. This method is thread-safe.
func (i *Streaming) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error) {
props := i.newProp()
for _, prop := range options {
err := prop.Run(&props, StreamingClient, FromReader)
if err != nil {
return nil, err
}
}
return streamImpl(i.streamConn, ctx, reader, props, false)
}
func streamImpl(c streamIngestor, ctx context.Context, payload io.Reader, props properties.All, isBlobUri bool) (*Result, error) {
compress := queued.ShouldCompress(&props, ingestoptions.CTUnknown)
if compress && !isBlobUri {
payload = gzip.Compress(payload)
}
if props.Ingestion.Additional.Format == DFUnknown {
props.Ingestion.Additional.Format = CSV
}
err := c.StreamIngest(ctx, props.Ingestion.DatabaseName, props.Ingestion.TableName, payload, props.Ingestion.Additional.Format,
props.Ingestion.Additional.IngestionMappingRef,
props.Streaming.ClientRequestId,
isBlobUri)
if err != nil {
if e, ok := errors.GetKustoError(err); ok {
return nil, e
}
return nil, errors.E(errors.OpIngestStream, errors.KClientArgs, err)
}
err = props.ApplyDeleteLocalSourceOption()
if err != nil {
return nil, err
}
result := newResult()
result.putProps(props)
result.record.Status = "Success"
return result, nil
}
func (i *Streaming) newProp() properties.All {
return properties.All{
Ingestion: properties.Ingestion{
DatabaseName: i.db,
TableName: i.table,
},
Streaming: properties.Streaming{
ClientRequestId: "KGC.executeStreaming;" + uuid.New().String(),
},
}
}
func (i *Streaming) Close() error {
return i.streamConn.Close()
}
func generateBlobUriPayloadReader(fPath string) io.Reader {
buf := new(bytes.Buffer)
json.NewEncoder(buf).Encode(
blobUri{
SourceUri: fPath,
},
)
return io.NopCloser(buf)
}