search-elasticsearch/es.go (256 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package es import ( "context" "embed" "encoding/json" "fmt" "strings" "sync" "github.com/apache/answer-plugins/search-elasticsearch/i18n" "github.com/apache/answer-plugins/util" "github.com/apache/answer/plugin" "github.com/olivere/elastic/v7" "github.com/segmentfault/pacman/log" ) //go:embed info.yaml var Info embed.FS type SearchEngine struct { Config *SearchEngineConfig Operator *Operator syncer plugin.SearchSyncer syncing bool lock sync.Mutex } type SearchEngineConfig struct { Endpoints string `json:"endpoints"` Username string `json:"username"` Password string `json:"password"` } func init() { plugin.Register(&SearchEngine{ Config: &SearchEngineConfig{}, lock: sync.Mutex{}, }) } func (s *SearchEngine) Info() plugin.Info { info := &util.Info{} info.GetInfo(Info) return plugin.Info{ Name: plugin.MakeTranslator(i18n.InfoName), SlugName: info.SlugName, Description: plugin.MakeTranslator(i18n.InfoDescription), Author: info.Author, Version: info.Version, Link: info.Link, } } func (s *SearchEngine) Description() plugin.SearchDesc { return plugin.SearchDesc{} } func (s *SearchEngine) SearchContents( ctx context.Context, cond *plugin.SearchBasicCond) ( res []plugin.SearchResult, total int64, err error) { if s.Operator == nil { return nil, 0, fmt.Errorf("es client not init") } resp, err := s.Operator.QueryDoc(ctx, s.getIndexName(), s.buildQuery(cond), s.buildSort(cond), s.buildCols(), cond.Page, cond.PageSize) if err != nil { return nil, 0, fmt.Errorf("es query error: %w", err) } if resp == nil { return nil, 0, nil } return s.warpResult(resp) } func (s *SearchEngine) SearchQuestions( ctx context.Context, cond *plugin.SearchBasicCond) ( res []plugin.SearchResult, total int64, err error) { if s.Operator == nil { return nil, 0, fmt.Errorf("es client not init") } query := s.buildQuery(cond) query.Must(elastic.NewTermQuery("type", "question")) resp, err := s.Operator.QueryDoc(ctx, s.getIndexName(), query, s.buildSort(cond), s.buildCols(), cond.Page, cond.PageSize) if err != nil { return nil, 0, fmt.Errorf("es query error: %w", err) } if resp == nil { return nil, 0, nil } return s.warpResult(resp) } func (s *SearchEngine) SearchAnswers( ctx context.Context, cond *plugin.SearchBasicCond) ( res []plugin.SearchResult, total int64, err error) { if s.Operator == nil { return nil, 0, fmt.Errorf("es client not init") } query := s.buildQuery(cond) query.Must(elastic.NewTermQuery("type", "answer")) resp, err := s.Operator.QueryDoc(ctx, s.getIndexName(), query, s.buildSort(cond), s.buildCols(), cond.Page, cond.PageSize) if err != nil { return nil, 0, fmt.Errorf("es query error: %w", err) } if resp == nil { return nil, 0, nil } return s.warpResult(resp) } func (s *SearchEngine) UpdateContent(ctx context.Context, content *plugin.SearchContent) error { if s.Operator == nil { return fmt.Errorf("es client not init") } return s.Operator.SaveDoc(ctx, s.getIndexName(), content.ObjectID, CreateDocFromSearchContent(content.ObjectID, content)) } func (s *SearchEngine) DeleteContent(ctx context.Context, contentID string) error { if s.Operator == nil { return fmt.Errorf("es client not init") } return s.Operator.DeleteDoc(ctx, s.getIndexName(), contentID) } func (s *SearchEngine) RegisterSyncer(ctx context.Context, syncer plugin.SearchSyncer) { s.syncer = syncer s.sync() } func (s *SearchEngine) warpResult(resp *elastic.SearchResult) ([]plugin.SearchResult, int64, error) { res := make([]plugin.SearchResult, 0) for _, hit := range resp.Hits.Hits { docByte, err := hit.Source.MarshalJSON() if err != nil { log.Errorf("es unmarshal error: %v", err) continue } var content AnswerPostDoc err = json.Unmarshal(docByte, &content) if err != nil { log.Errorf("es unmarshal error: %v", err) continue } res = append(res, plugin.SearchResult{ ID: hit.Id, Type: content.Type, }) } log.Debugf("search result: %d", len(res)) return res, resp.TotalHits(), nil } func (s *SearchEngine) ConfigFields() []plugin.ConfigField { return []plugin.ConfigField{ { Name: "endpoints", Type: plugin.ConfigTypeInput, Title: plugin.MakeTranslator(i18n.ConfigEndpointsTitle), Description: plugin.MakeTranslator(i18n.ConfigEndpointsDescription), Required: true, UIOptions: plugin.ConfigFieldUIOptions{ InputType: plugin.InputTypeText, }, Value: s.Config.Endpoints, }, { Name: "username", Type: plugin.ConfigTypeInput, Title: plugin.MakeTranslator(i18n.ConfigUsernameTitle), Description: plugin.MakeTranslator(i18n.ConfigUsernameDescription), Required: true, UIOptions: plugin.ConfigFieldUIOptions{ InputType: plugin.InputTypeText, }, Value: s.Config.Username, }, { Name: "password", Type: plugin.ConfigTypeInput, Title: plugin.MakeTranslator(i18n.ConfigPasswordTitle), Description: plugin.MakeTranslator(i18n.ConfigPasswordDescription), Required: true, UIOptions: plugin.ConfigFieldUIOptions{ InputType: plugin.InputTypeText, }, Value: s.Config.Password, }, } } func (s *SearchEngine) ConfigReceiver(config []byte) error { conf := &SearchEngineConfig{} _ = json.Unmarshal(config, conf) s.Config = conf log.Debugf("try to init es client: %s", conf.Endpoints) operator, err := NewOperator(strings.Split(conf.Endpoints, ","), conf.Username, conf.Password) if err != nil { return fmt.Errorf("init es client error: %w", err) } s.Operator = operator err = s.Operator.CreateIndex(context.Background(), s.getIndexName(), indexJson) if err != nil { return fmt.Errorf("create index error: %w", err) } return nil } func (s *SearchEngine) getIndexName() string { return "answer_post" } func (s *SearchEngine) buildSort(cond *plugin.SearchBasicCond) (sort *elastic.FieldSort) { switch cond.Order { case plugin.SearchNewestOrder: return elastic.NewFieldSort("created").Desc() case plugin.SearchActiveOrder: return elastic.NewFieldSort("active").Desc() case plugin.SearchScoreOrder: return elastic.NewFieldSort("score").Desc() default: return nil } } func (s *SearchEngine) buildCols() (cols *elastic.FetchSourceContext) { return elastic.NewFetchSourceContext(true).Include("id", "type") } func (s *SearchEngine) buildQuery(cond *plugin.SearchBasicCond) ( query *elastic.BoolQuery) { log.Debugf("build query: %+v", cond) q := elastic.NewBoolQuery() for _, tagGroup := range cond.TagIDs { if len(tagGroup) > 0 { q.Must(elastic.NewTermsQuery("tags", convertToInterfaceSlice(tagGroup)...)) } } if len(cond.UserID) > 0 { q.Must(elastic.NewTermQuery("user_id", cond.UserID)) } if len(cond.QuestionID) > 0 { q.Must(elastic.NewTermQuery("question_id", cond.QuestionID)) } if cond.VoteAmount > 0 { q.Must(elastic.NewRangeQuery("score").Gte(cond.VoteAmount)) } if cond.ViewAmount > 0 { q.Must(elastic.NewRangeQuery("views").Gte(cond.ViewAmount)) } if cond.AnswerAmount > 0 { q.Must(elastic.NewRangeQuery("answers").Gte(cond.AnswerAmount)) } if cond.AnswerAccepted == plugin.AcceptedCondTrue { q.Must(elastic.NewTermQuery("has_accepted", true)) } else if cond.AnswerAccepted == plugin.AcceptedCondFalse { q.MustNot(elastic.NewTermQuery("has_accepted", true)) } if cond.QuestionAccepted == plugin.AcceptedCondTrue { q.MustNot(elastic.NewTermQuery("has_accepted", true)) } else if cond.QuestionAccepted == plugin.AcceptedCondFalse { q.Must(elastic.NewTermQuery("has_accepted", false)) } if len(cond.Words) > 0 { q.Must(elastic.NewMultiMatchQuery(strings.Join(cond.Words, " "), "title", "content")) } q.Must(elastic.NewTermQuery("status", plugin.SearchContentStatusAvailable)) return q } func convertToInterfaceSlice(slice []string) []interface{} { s := make([]interface{}, len(slice)) for i, v := range slice { s[i] = v } return s }