pkg/espoll/search.go (139 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package espoll import ( "context" "encoding/json" "fmt" "strings" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/go-elasticsearch/v8/esutil" ) // SearchIndexMinDocs searches index with query, returning the results. // // If the search returns fewer than min results within 10 seconds // (by default), SearchIndexMinDocs will return an error. func (es *Client) SearchIndexMinDocs( ctx context.Context, min int, index string, query json.Marshaler, opts ...RequestOption, ) (SearchResult, error) { var result SearchResult req := es.NewSearchRequest(index) req.ExpandWildcards = "open,hidden" if min > 10 { // Size defaults to 10. If the caller expects more than 10, // return it in the search so we don't have to search again. req = req.WithSize(min) } if query != nil { req = req.WithQuery(query) } opts = append(opts, WithCondition(AllCondition( result.Hits.MinHitsCondition(min), result.Hits.TotalHitsCondition(req), ))) // Refresh the indices before issuing the search request. refreshReq := esapi.IndicesRefreshRequest{ Index: strings.Split(",", index), ExpandWildcards: "all", } rsp, err := refreshReq.Do(ctx, es.Transport) if err != nil { return result, fmt.Errorf("failed refreshing indices: %s: %w", index, err) } rsp.Body.Close() if _, err := req.Do(ctx, &result, opts...); err != nil { return result, fmt.Errorf("failed issuing request: %w", err) } return result, nil } // NewSearchRequest returns a search request using the wrapped Elasticsearch // client. func (es *Client) NewSearchRequest(index string) *SearchRequest { req := &SearchRequest{es: es} req.Index = strings.Split(index, ",") req.Body = strings.NewReader(`{"fields": ["*"]}`) return req } // SearchRequest wraps an esapi.SearchRequest with a Client. type SearchRequest struct { esapi.SearchRequest es *Client } func (r *SearchRequest) WithQuery(q any) *SearchRequest { var body struct { Query any `json:"query"` Fields []string `json:"fields"` } body.Query = q body.Fields = []string{"*"} r.Body = esutil.NewJSONReader(&body) return r } func (r *SearchRequest) WithSort(fieldDirection ...string) *SearchRequest { r.Sort = fieldDirection return r } func (r *SearchRequest) WithSize(size int) *SearchRequest { r.Size = &size return r } func (r *SearchRequest) Do(ctx context.Context, out *SearchResult, opts ...RequestOption) (*esapi.Response, error) { return r.es.Do(ctx, &r.SearchRequest, out, opts...) } type SearchResult struct { Hits SearchHits `json:"hits"` Aggregations map[string]json.RawMessage `json:"aggregations"` } type SearchHits struct { Total SearchHitsTotal `json:"total"` Hits []SearchHit `json:"hits"` } type SearchHitsTotal struct { Value int `json:"value"` Relation string `json:"relation"` // "eq" or "gte" } // NonEmptyCondition returns a ConditionFunc which will return true if h.Hits is non-empty. func (h *SearchHits) NonEmptyCondition() ConditionFunc { return h.MinHitsCondition(1) } // MinHitsCondition returns a ConditionFunc which will return true if the number of h.Hits // is at least min. func (h *SearchHits) MinHitsCondition(min int) ConditionFunc { return func(*esapi.Response) bool { return len(h.Hits) >= min } } // TotalHitsCondition returns a ConditionFunc which will return true if the number of h.Hits // is at least h.Total.Value. If the condition returns false, it will update req.Size to // accommodate the number of hits in the following search. func (h *SearchHits) TotalHitsCondition(req *SearchRequest) ConditionFunc { return func(*esapi.Response) bool { if len(h.Hits) >= h.Total.Value { return true } size := h.Total.Value req.Size = &size return false } } type SearchHit struct { Index string ID string Score float64 Fields map[string][]any Source map[string]any RawSource json.RawMessage RawFields json.RawMessage } func (h *SearchHit) UnmarshalJSON(data []byte) error { var searchHit struct { Index string `json:"_index"` ID string `json:"_id"` Score float64 `json:"_score"` Source json.RawMessage `json:"_source"` Fields json.RawMessage `json:"fields"` } if err := json.Unmarshal(data, &searchHit); err != nil { return err } h.Index = searchHit.Index h.ID = searchHit.ID h.Score = searchHit.Score h.RawSource = searchHit.Source h.RawFields = searchHit.Fields h.Source = make(map[string]any) h.Fields = make(map[string][]interface{}) if err := json.Unmarshal(h.RawSource, &h.Source); err != nil { return fmt.Errorf("error unmarshaling _source: %w", err) } if err := json.Unmarshal(h.RawFields, &h.Fields); err != nil { return fmt.Errorf("error unmarshaling fields: %w", err) } return nil } func (h *SearchHit) UnmarshalSource(out any) error { return json.Unmarshal(h.RawSource, out) }