datasource/opensearch/opensearch.go (69 lines of code) (raw):
package opensearch
import (
"fmt"
"sync"
openSearchClient "github.com/alibaba/pairec/v2/datasource/opensearch/client"
"github.com/alibaba/pairec/v2/recconf"
util "github.com/alibabacloud-go/tea-utils/service"
"github.com/alibabacloud-go/tea/tea"
)
type OpenSearchClient struct {
OpenSearchClient *openSearchClient.Client
Runtime *util.RuntimeOptions
}
var (
mu sync.RWMutex
opensearchInstances = make(map[string]*OpenSearchClient)
)
func GetOpenSearchClient(name string) (*OpenSearchClient, error) {
mu.RLock()
defer mu.RUnlock()
if _, ok := opensearchInstances[name]; !ok {
return nil, fmt.Errorf("opensearchClient not found, name:%s", name)
}
return opensearchInstances[name], nil
}
func RegisterOpenSearchClient(name string, client *OpenSearchClient) {
mu.Lock()
defer mu.Unlock()
if _, ok := opensearchInstances[name]; !ok {
opensearchInstances[name] = client
}
}
func NewOpenSearchClient(endpoint, accessId, accessKey string) *OpenSearchClient {
p := &OpenSearchClient{}
config := &openSearchClient.Config{
Endpoint: tea.String(endpoint),
AccessKeyId: tea.String(accessId),
AccessKeySecret: tea.String(accessKey),
}
client, _clientErr := openSearchClient.NewClient(config)
if _clientErr != nil {
panic(_clientErr)
}
p.OpenSearchClient = client
p.Runtime = &util.RuntimeOptions{
ConnectTimeout: tea.Int(2000),
ReadTimeout: tea.Int(1000),
Autoretry: tea.Bool(false),
IgnoreSSL: tea.Bool(false),
MaxIdleConns: tea.Int(50),
}
return p
}
func (d *OpenSearchClient) Init() error {
return nil
}
func Load(config *recconf.RecommendConfig) {
for name, conf := range config.OpenSearchConfs {
if _, ok := opensearchInstances[name]; ok {
continue
}
m := NewOpenSearchClient(conf.EndPoint, conf.AccessKeyId, conf.AccessKeySecret)
err := m.Init()
if err != nil {
panic(err)
}
RegisterOpenSearchClient(name, m)
}
}