common/elasticsearch/client/os2/client.go (277 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package os2 import ( "context" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/opensearch-project/opensearch-go/v2" osapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/aws" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/elasticsearch/client" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/types" ) type ( // OS2 implements Client OS2 struct { client *opensearch.Client logger log.Logger decoder *NumberDecoder } osError struct { Status int `json:"status"` Details *errorDetails `json:"error,omitempty"` } errorDetails struct { Type string `json:"type"` Reason string `json:"reason"` Index string `json:"index,omitempty"` } // response holds data retrieved from OpenSearch response struct { TookInMillis int64 `json:"took,omitempty"` Hits *searchHits Aggregations map[string]json.RawMessage `json:"aggregations,omitempty"` Sort []interface{} `json:"sort,omitempty"` // sort information ScrollID string `json:"_scroll_id,omitempty"` } // searchHits specifies the list of search hits. searchHits struct { TotalHits *totalHits `json:"total,omitempty"` // total number of hits found Hits []*searchHit `json:"hits,omitempty"` // the actual hits returned } // totalHits specifies total number of hits and its relation totalHits struct { Value int64 `json:"value"` // value of the total hit count } // searchHit is a single hit. searchHit struct { Index string `json:"_index,omitempty"` // index name ID string `json:"_id,omitempty"` // external or internal Sort []interface{} `json:"sort,omitempty"` // sort information Source json.RawMessage `json:"_source,omitempty"` // stored document source } ) // NewClient returns a new implementation of GenericClient func NewClient( connectConfig *config.ElasticSearchConfig, logger log.Logger, tlsClient *http.Client, ) (*OS2, error) { osconfig := opensearch.Config{ Addresses: []string{connectConfig.URL.String()}, MaxRetries: 5, RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond }, } // DiscoverNodesOnStart is false by default. Turn it on only when disable sniff is set to False in ES config if !connectConfig.DisableSniff { osconfig.DiscoverNodesOnStart = true } if connectConfig.AWSSigning.Enable { credentials, region, err := connectConfig.AWSSigning.GetCredentials() if err != nil { return nil, fmt.Errorf("getting aws credentials: %w", err) } sessionOptions := session.Options{ Config: aws.Config{ Region: region, Credentials: credentials, }, } signer, err := requestsigner.NewSigner(sessionOptions) if err != nil { return nil, fmt.Errorf("creating aws signer: %w", err) } osconfig.Signer = signer } if tlsClient != nil { osconfig.Transport = tlsClient.Transport logger.Info("Using TLS client") } osClient, err := opensearch.NewClient(osconfig) if err != nil { return nil, fmt.Errorf("creating OpenSearch client: %w", err) } // initial health check resp, err := osClient.Ping() if err != nil { return nil, fmt.Errorf("OpenSearch client unable to ping: %w", err) } if resp.IsError() { return nil, fmt.Errorf("OpenSearch client received error on ping: %s", resp) } return &OS2{ client: osClient, logger: logger, decoder: &NumberDecoder{}, }, nil } func (c *OS2) IsNotFoundError(err error) bool { var clientErr *osError if errors.As(err, &clientErr) { return clientErr.Status == http.StatusNotFound } return false } func (c *OS2) PutMapping(ctx context.Context, index, body string) error { req := osapi.IndicesPutMappingRequest{ Index: []string{index}, Body: strings.NewReader(body), } resp, err := req.Do(ctx, c.client) if err != nil { return fmt.Errorf("OpenSearch PutMapping: %w", err) } defer closeBody(resp) if resp.IsError() { return c.parseError(resp) } return nil } func (c *OS2) CreateIndex(ctx context.Context, index string) error { req := osapi.IndicesCreateRequest{ Index: index, } resp, err := req.Do(ctx, c.client) if err != nil { return fmt.Errorf("OpenSearch CreateIndex: %w", err) } defer closeBody(resp) if resp.IsError() { return c.parseError(resp) } return nil } func (c *OS2) Count(ctx context.Context, index, query string) (int64, error) { resp, err := c.client.Count(c.client.Count.WithIndex(index), c.client.Count.WithBody(strings.NewReader(query))) if err != nil { return 0, fmt.Errorf("OpenSearch Count: %w", err) } defer closeBody(resp) if resp.IsError() { return 0, c.parseError(resp) } type CountResponse struct { Count int64 `json:"count"` } count := &CountResponse{} if err := c.decoder.Decode(resp.Body, count); err != nil { return 0, fmt.Errorf("decoding Opensearch Count result to int64: %w", err) } return count.Count, nil } func (c *OS2) ClearScroll(ctx context.Context, scrollID string) error { resp, err := c.client.ClearScroll( c.client.ClearScroll.WithContext(ctx), c.client.ClearScroll.WithScrollID(scrollID)) if err != nil { return fmt.Errorf("OpenSearch ClearScroll: %w", err) } defer closeBody(resp) if resp.IsError() { return c.parseError(resp) } return nil } func (c *OS2) Scroll(ctx context.Context, index, body, scrollID string) (*client.Response, error) { var resp *osapi.Response var err error if len(scrollID) != 0 { resp, err = c.client.Scroll( c.client.Scroll.WithScrollID(scrollID), c.client.Scroll.WithScroll(time.Minute), c.client.Scroll.WithContext(ctx), ) } else { // when scrollID is not passed, it is normal search request resp, err = c.client.Search( c.client.Search.WithIndex(index), c.client.Search.WithBody(strings.NewReader(body)), c.client.Search.WithScroll(time.Minute), c.client.Search.WithContext(ctx), ) } if err != nil { return nil, fmt.Errorf("OpenSearch Scroll: %w", err) } defer closeBody(resp) if resp.IsError() { return nil, c.parseError(resp) } var osResponse response var totalHits int64 if err := c.decoder.Decode(resp.Body, &osResponse); err != nil { return nil, fmt.Errorf("decoding OpenSearch result to Response: %w", err) } var hits []*client.SearchHit // no more hits if osResponse.Hits == nil || len(osResponse.Hits.Hits) == 0 { return &client.Response{ ScrollID: osResponse.ScrollID, TookInMillis: osResponse.TookInMillis, Hits: &client.SearchHits{Hits: hits}, }, io.EOF } for _, h := range osResponse.Hits.Hits { hits = append(hits, &client.SearchHit{Source: h.Source}) } if osResponse.Hits.TotalHits != nil { totalHits = osResponse.Hits.TotalHits.Value } return &client.Response{ TookInMillis: osResponse.TookInMillis, TotalHits: totalHits, Hits: &client.SearchHits{Hits: hits}, Aggregations: osResponse.Aggregations, ScrollID: osResponse.ScrollID, }, nil } func (c *OS2) Search(ctx context.Context, index, body string) (*client.Response, error) { resp, err := c.client.Search( c.client.Search.WithContext(ctx), c.client.Search.WithIndex(index), c.client.Search.WithBody(strings.NewReader(body)), ) if err != nil { return nil, fmt.Errorf("OpenSearch Search: %w", err) } defer closeBody(resp) if resp.IsError() { return nil, types.InternalServiceError{ Message: fmt.Sprintf("OpenSearch Search Error: %v", c.parseError(resp)), } } var osResponse response if err := c.decoder.Decode(resp.Body, &osResponse); err != nil { return nil, fmt.Errorf("decoding Opensearch result to Response: %w", err) } var hits []*client.SearchHit var sort []interface{} var totalHits int64 if osResponse.Hits != nil && osResponse.Hits.TotalHits != nil { totalHits = osResponse.Hits.TotalHits.Value for _, h := range osResponse.Hits.Hits { sort = h.Sort hits = append(hits, &client.SearchHit{Source: h.Source}) } } return &client.Response{ TookInMillis: osResponse.TookInMillis, TotalHits: totalHits, Hits: &client.SearchHits{Hits: hits}, Aggregations: osResponse.Aggregations, Sort: sort, }, nil } func (e *osError) Error() string { return fmt.Sprintf("Status code: %d, Type: %s, Reason: %s", e.Status, e.Details.Type, e.Details.Reason) } func (c *OS2) parseError(response *osapi.Response) error { var e osError if err := c.decoder.Decode(response.Body, &e); err != nil { return err } return &e } func closeBody(response *osapi.Response) { if response != nil && response.Body != nil { response.Body.Close() } }