azkustodata/kusto.go (234 lines of code) (raw):

package azkustodata import ( "context" "github.com/Azure/azure-kusto-go/azkustodata/kql" "github.com/Azure/azure-kusto-go/azkustodata/query" "github.com/Azure/azure-kusto-go/azkustodata/value" v1 "github.com/Azure/azure-kusto-go/azkustodata/query/v1" queryv2 "github.com/Azure/azure-kusto-go/azkustodata/query/v2" "io" "net/http" "time" "github.com/Azure/azure-kusto-go/azkustodata/errors" ) type Statement = *kql.Builder // queryer provides for getting a stream of Kusto frames. Exists to allow fake Kusto streams in tests. type queryer interface { io.Closer rawQuery(ctx context.Context, callType callType, db string, query Statement, options *queryOptions) (io.ReadCloser, error) } // Authorization provides the TokenProvider needed to acquire the auth token. type Authorization struct { // Token provider that can be used to get the access token. TokenProvider *TokenProvider } const ( defaultMgmtTimeout = time.Hour defaultQueryTimeout = 4 * time.Minute clientServerDelta = 30 * time.Second ) // Client is a client to a Kusto instance. type Client struct { conn queryer endpoint string auth Authorization http *http.Client clientDetails *ClientDetails } // Option is an optional argument type for New(). type Option func(c *Client) // New returns a new Client. func New(kcsb *ConnectionStringBuilder, options ...Option) (*Client, error) { tkp, err := kcsb.newTokenProvider() if err != nil { return nil, err } auth := &Authorization{ TokenProvider: tkp, } endpoint := kcsb.DataSource client := &Client{auth: *auth, endpoint: endpoint, clientDetails: NewClientDetails(kcsb.ApplicationForTracing, kcsb.UserForTracing)} for _, o := range options { o(client) } if client.http == nil { client.http = &http.Client{ CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse }, } } conn, err := NewConn(endpoint, *auth, client.http, client.clientDetails) if err != nil { return nil, err } client.conn = conn return client, nil } func WithHttpClient(client *http.Client) Option { return func(c *Client) { c.http = client } } // QueryOption is an option type for a call to Query(). type QueryOption func(q *queryOptions) error // Auth returns the Authorization passed to New(). func (c *Client) Auth() Authorization { return c.auth } // Endpoint returns the endpoint passed to New(). func (c *Client) Endpoint() string { return c.endpoint } type callType int8 const ( queryCall = 1 mgmtCall = 2 ) func (c *Client) Mgmt(ctx context.Context, db string, kqlQuery Statement, options ...QueryOption) (v1.Dataset, error) { ctx, cancel := contextSetup(ctx) opQuery := errors.OpMgmt call := mgmtCall opts, err := setQueryOptions(ctx, opQuery, kqlQuery, call, options...) if err != nil { return nil, err } conn, err := c.getConn(callType(call), connOptions{queryOptions: opts}) if err != nil { return nil, err } res, err := conn.rawQuery(ctx, callType(call), db, kqlQuery, opts) if err != nil { cancel() return nil, err } return v1.NewDatasetFromReader(ctx, opQuery, res) } func (c *Client) Query(ctx context.Context, db string, kqlQuery Statement, options ...QueryOption) (query.Dataset, error) { ds, err := c.IterativeQuery(ctx, db, kqlQuery, options...) if err != nil { return nil, err } return ds.ToDataset() } func (c *Client) IterativeQuery(ctx context.Context, db string, kqlQuery Statement, options ...QueryOption) (query.IterativeDataset, error) { options = append(options, V2NewlinesBetweenFrames()) options = append(options, V2FragmentPrimaryTables()) options = append(options, ResultsErrorReportingPlacement(ResultsErrorReportingPlacementEndOfTable)) opts, res, err := c.rawV2(ctx, db, kqlQuery, options) if err != nil { return nil, err } frameCapacity := queryv2.DefaultIoCapacity if opts.v2IoCapacity != -1 { frameCapacity = opts.v2IoCapacity } rowCapacity := queryv2.DefaultRowCapacity if opts.v2RowCapacity != -1 { rowCapacity = opts.v2RowCapacity } fragmentCapacity := queryv2.DefaultTableCapacity if opts.v2TableCapacity != -1 { fragmentCapacity = opts.v2TableCapacity } return queryv2.NewIterativeDataset(ctx, res, frameCapacity, rowCapacity, fragmentCapacity) } func (c *Client) RawV2(ctx context.Context, db string, kqlQuery Statement, options []QueryOption) (io.ReadCloser, error) { _, res, err := c.rawV2(ctx, db, kqlQuery, options) return res, err } func (c *Client) rawV2(ctx context.Context, db string, kqlQuery Statement, options []QueryOption) (*queryOptions, io.ReadCloser, error) { ctx, cancel := contextSetup(ctx) opQuery := errors.OpQuery opts, err := setQueryOptions(ctx, opQuery, kqlQuery, queryCall, options...) if err != nil { return nil, nil, err } conn, err := c.getConn(queryCall, connOptions{queryOptions: opts}) if err != nil { return nil, nil, err } res, err := conn.rawQuery(ctx, queryCall, db, kqlQuery, opts) if err != nil { cancel() return nil, nil, err } return opts, res, nil } func (c *Client) QueryToJson(ctx context.Context, db string, query Statement, options ...QueryOption) (string, error) { _, res, err := c.rawV2(ctx, db, query, options) if err != nil { return "", err } all, err := io.ReadAll(res) if err != nil { return "", err } return string(all), nil } func setQueryOptions(ctx context.Context, op errors.Op, query Statement, queryType int, options ...QueryOption) (*queryOptions, error) { opt := &queryOptions{ requestProperties: &requestProperties{ Options: map[string]interface{}{}, }, v2IoCapacity: -1, v2RowCapacity: -1, v2TableCapacity: -1, } for _, o := range options { if err := o(opt); err != nil { return nil, errors.ES(op, errors.KClientArgs, "QueryValues in the the Stmt were incorrect: %s", err).SetNoRetry() } } CalculateTimeout(ctx, opt, queryType) if query.SupportsInlineParameters() { if opt.requestProperties.QueryParameters.Count() != 0 { return nil, errors.ES(op, errors.KClientArgs, "kusto.Stmt does not support the QueryParameters option. Construct your query using `kql.New`").SetNoRetry() } params, err := query.GetParameters() if err != nil { return nil, errors.ES(op, errors.KClientArgs, "Parameter validation error: %s", err).SetNoRetry() } opt.requestProperties.Parameters = params } return opt, nil } var nower = time.Now func CalculateTimeout(ctx context.Context, opt *queryOptions, queryType int) { // If the user has specified a timeout, use that. if val, ok := opt.requestProperties.Options[NoRequestTimeoutValue]; ok && val.(bool) { return } if _, ok := opt.requestProperties.Options[ServerTimeoutValue]; ok { return } // Otherwise use the context deadline, if it exists. If it doesn't, use the default timeout. if deadline, ok := ctx.Deadline(); ok { opt.requestProperties.Options[ServerTimeoutValue] = value.TimespanString(deadline.Sub(nower())) return } var timeout time.Duration switch queryType { case queryCall: timeout = defaultQueryTimeout case mgmtCall: timeout = defaultMgmtTimeout } opt.requestProperties.Options[ServerTimeoutValue] = value.TimespanString(timeout) } func (c *Client) getConn(callType callType, options connOptions) (queryer, error) { switch callType { case queryCall: return c.conn, nil case mgmtCall: delete(options.queryOptions.requestProperties.Options, "results_progressive_enabled") return c.conn, nil default: return nil, errors.ES(errors.OpServConn, errors.KInternal, "an unknown calltype was passed to getConn()") } } func contextSetup(ctx context.Context) (context.Context, context.CancelFunc) { return context.WithCancel(ctx) } func (c *Client) HttpClient() *http.Client { return c.http } func (c *Client) ClientDetails() *ClientDetails { return c.clientDetails } func (c *Client) Close() error { var err error if c.conn != nil { err = c.conn.Close() } return err }