in pkg/esoutput/esoutput.go [84:181]
func New(params output.Params) (output.Output, error) {
params.Logger.Info("Elasticsearch: configuring output")
config, err := GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
if err != nil {
return nil, err
}
var addresses = []string{config.Url.ValueOrZero()}
var esConfig es.Config
// Cloud id takes precedence over a URL (which is localhost by default)
if config.CloudID.Valid {
esConfig.CloudID = config.CloudID.String
} else if config.Url.Valid {
esConfig.Addresses = strings.Split(strings.Join(addresses, ""), ",")
}
if config.User.Valid {
esConfig.Username = config.User.String
}
if config.Password.Valid {
esConfig.Password = config.Password.String
}
if config.APIKey.Valid {
esConfig.APIKey = config.APIKey.String
}
if config.ServiceAccountToken.Valid {
esConfig.ServiceToken = config.ServiceAccountToken.String
}
if config.CACert.Valid {
cert, err := os.ReadFile(config.CACert.String)
if err != nil {
return nil, err
}
esConfig.CACert = cert
}
var clientTLSCert tls.Certificate
if config.ClientCert.Valid && config.ClientKey.Valid {
clientTLSCert, err = tls.LoadX509KeyPair(config.ClientCert.String, config.ClientKey.String)
if err != nil {
return nil, err
}
}
esConfig.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: config.InsecureSkipVerify.Bool,
Certificates: []tls.Certificate{clientTLSCert},
},
}
client, err := es.NewClient(esConfig)
if err != nil {
return nil, err
}
// ensure basic connectivity
info, err := client.Info()
if err != nil {
return nil, err
}
if info.StatusCode != 200 {
// The info API requires the 'monitor' privilege and the user might not have that. We can only get a 403 if
// security is configured on this cluster. Therefore, we call the has privilege API that is guaranteed to work
//for every user.
if info.StatusCode == 403 {
priv, err := client.Security.HasPrivileges(strings.NewReader(fmt.Sprintf(hasPrivilegesBody, config.IndexName.String)))
if err != nil {
return nil, err
}
if priv.StatusCode != 200 {
return nil, fmt.Errorf("cannot connect to Elasticsearch (status code %d)", priv.StatusCode)
}
} else {
return nil, fmt.Errorf("cannot connect to Elasticsearch (status code %d)", info.StatusCode)
}
}
bulkIndexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: config.IndexName.String,
Client: client,
OnError: func(ctx context.Context, err error) {
// this happens usually due to permission issues
params.Logger.Errorf("Could not write metrics: %s", err)
},
})
if err != nil {
return nil, fmt.Errorf("error creating the indexer: %v", err)
}
return &Output{
client: client,
bulkIndexer: bulkIndexer,
config: config,
logger: params.Logger,
}, nil
}