azkustoingest/ingest.go (170 lines of code) (raw):

package azkustoingest import ( "context" "fmt" "github.com/Azure/azure-kusto-go/azkustodata" "github.com/Azure/azure-kusto-go/azkustodata/errors" "github.com/Azure/azure-kusto-go/azkustoingest/internal/properties" "github.com/Azure/azure-kusto-go/azkustoingest/internal/queued" "github.com/Azure/azure-kusto-go/azkustoingest/internal/resources" "github.com/google/uuid" "io" ) type Ingestor interface { io.Closer FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error) } // Ingestion provides data ingestion from external sources into Kusto. type Ingestion struct { db string table string client QueryClient mgr *resources.Manager fs queued.Queued bufferSize int maxBuffers int withoutEndpointCorrection bool customIngestConnectionString *azkustodata.ConnectionStringBuilder applicationForTracing string clientVersionForTracing string } // New is a constructor for Ingestion. func New(kcsb *azkustodata.ConnectionStringBuilder, options ...Option) (*Ingestion, error) { i := getOptions(options) if !i.withoutEndpointCorrection { newKcsb := *kcsb newKcsb.DataSource = addIngestPrefix(newKcsb.DataSource) kcsb = &newKcsb } clientDetails := azkustodata.NewClientDetails(kcsb.ApplicationForTracing, kcsb.UserForTracing) i.applicationForTracing = clientDetails.ApplicationForTracing() i.clientVersionForTracing = clientDetails.ClientVersionForTracing() client, err := azkustodata.New(kcsb) if err != nil { return nil, err } return newFromClient(client, i) } func newFromClient(client QueryClient, i *Ingestion) (*Ingestion, error) { mgr, err := resources.New(client) if err != nil { client.Close() return nil, err } i.client = client i.mgr = mgr fs, err := queued.New(i.db, i.table, mgr, client.HttpClient(), i.applicationForTracing, i.clientVersionForTracing, queued.WithStaticBuffer(i.bufferSize, i.maxBuffers)) if err != nil { mgr.Close() client.Close() return nil, err } i.fs = fs return i, nil } func (i *Ingestion) prepForIngestion(ctx context.Context, options []FileOption, props properties.All, source SourceScope) (*Result, properties.All, error) { result := newResult() auth, err := i.mgr.AuthContext(ctx) if err != nil { return nil, properties.All{}, err } props.Ingestion.Additional.AuthContext = auth for _, o := range options { if err := o.Run(&props, QueuedClient, source); err != nil { return nil, properties.All{}, err } } if source == FromReader && props.Ingestion.Additional.Format == DFUnknown { props.Ingestion.Additional.Format = CSV } if props.Ingestion.Additional.IngestionMappingType != DFUnknown && props.Ingestion.Additional.Format.MappingKind() != props.Ingestion.Additional.IngestionMappingType { return nil, properties.All{}, errors.ES( errors.OpUnknown, errors.KClientArgs, "format and ingestion mapping type must match (hint: using ingestion mapping sets the format automatically)", ).SetNoRetry() } if props.Ingestion.ReportLevel != properties.None { if props.Source.ID == uuid.Nil { props.Source.ID = uuid.New() } switch props.Ingestion.ReportMethod { case properties.ReportStatusToTable, properties.ReportStatusToQueueAndTable: tableResources, err := i.mgr.GetTables() if err != nil { return nil, properties.All{}, err } if len(tableResources) == 0 { return nil, properties.All{}, fmt.Errorf("User requested reporting status to table, yet status table resource URI is not found") } props.Ingestion.TableEntryRef.TableConnectionString = tableResources[0].URL().String() props.Ingestion.TableEntryRef.PartitionKey = props.Source.ID.String() props.Ingestion.TableEntryRef.RowKey = uuid.Nil.String() } } result.putProps(props) return result, props, nil } // FromFile allows uploading a data file for Kusto from either a local path or a blobstore URI path. // This method is thread-safe. func (i *Ingestion) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error) { return i.fromFile(ctx, fPath, options, i.newProp()) } // fromFile is an internal function to allow managed streaming to pass a properties object to the ingestion. func (i *Ingestion) fromFile(ctx context.Context, fPath string, options []FileOption, props properties.All) (*Result, error) { local, err := queued.IsLocalPath(fPath) if err != nil { return nil, err } var scope SourceScope if local { scope = FromFile props.Source.OriginalSource = fPath } else { scope = FromBlob } result, props, err := i.prepForIngestion(ctx, options, props, scope) if err != nil { return nil, err } result.record.IngestionSourcePath = fPath if local { err = i.fs.Local(ctx, fPath, props) } else { err = i.fs.Blob(ctx, fPath, 0, props) } if err != nil { return nil, err } result.putQueued(ctx, i) return result, nil } // FromReader allows uploading a data file for Kusto from an io.Reader. The content is uploaded to Blobstore and // ingested after all data in the reader is processed. Content should not use compression as the content will be // compressed with gzip. This method is thread-safe. func (i *Ingestion) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error) { return i.fromReader(ctx, reader, options, i.newProp()) } // fromReader is an internal function to allow managed streaming to pass a properties object to the ingestion. func (i *Ingestion) fromReader(ctx context.Context, reader io.Reader, options []FileOption, props properties.All) (*Result, error) { result, props, err := i.prepForIngestion(ctx, options, props, FromReader) if err != nil { return nil, err } path, err := i.fs.Reader(ctx, reader, props) if err != nil { return nil, err } result.record.IngestionSourcePath = path result.putQueued(ctx, i) return result, nil } func (i *Ingestion) newProp() properties.All { return properties.All{ Ingestion: properties.Ingestion{ DatabaseName: i.db, TableName: i.table, }, } } func (i *Ingestion) Close() error { i.mgr.Close() err := i.client.Close() if err != nil { return err } err = i.fs.Close() return err }