pkg/esoutput/esoutput.go (202 lines of code) (raw):
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* This project is based on a modification of
* https://github.com/grafana/xk6-output-prometheus-remote which
* is licensed under the Apache 2.0 License.
*
*/
package esoutput
import (
"bytes"
"context"
"crypto/tls"
_ "embed"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"time"
es "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/sirupsen/logrus"
"go.k6.io/k6/output"
)
type elasticMetricEntry struct {
MetricName string
MetricType string
Value float64
Tags map[string]string
Time time.Time
}
type Output struct {
config Config
client *es.Client
bulkIndexer esutil.BulkIndexer
periodicFlusher *output.PeriodicFlusher
output.SampleBuffer
logger logrus.FieldLogger
}
const hasPrivilegesBody = `{
"index": [
{
"names": [
"%s"
],
"privileges": [
"write", "create_index"
]
}
]
}`
var _ output.Output = new(Output)
//go:embed mapping.json
var mapping []byte
func New(params output.Params) (output.Output, error) {
params.Logger.Info("Elasticsearch: configuring output")
config, err := GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
if err != nil {
return nil, err
}
var addresses = []string{config.Url.ValueOrZero()}
var esConfig es.Config
// Cloud id takes precedence over a URL (which is localhost by default)
if config.CloudID.Valid {
esConfig.CloudID = config.CloudID.String
} else if config.Url.Valid {
esConfig.Addresses = strings.Split(strings.Join(addresses, ""), ",")
}
if config.User.Valid {
esConfig.Username = config.User.String
}
if config.Password.Valid {
esConfig.Password = config.Password.String
}
if config.APIKey.Valid {
esConfig.APIKey = config.APIKey.String
}
if config.ServiceAccountToken.Valid {
esConfig.ServiceToken = config.ServiceAccountToken.String
}
if config.CACert.Valid {
cert, err := os.ReadFile(config.CACert.String)
if err != nil {
return nil, err
}
esConfig.CACert = cert
}
var clientTLSCert tls.Certificate
if config.ClientCert.Valid && config.ClientKey.Valid {
clientTLSCert, err = tls.LoadX509KeyPair(config.ClientCert.String, config.ClientKey.String)
if err != nil {
return nil, err
}
}
esConfig.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: config.InsecureSkipVerify.Bool,
Certificates: []tls.Certificate{clientTLSCert},
},
}
client, err := es.NewClient(esConfig)
if err != nil {
return nil, err
}
// ensure basic connectivity
info, err := client.Info()
if err != nil {
return nil, err
}
if info.StatusCode != 200 {
// The info API requires the 'monitor' privilege and the user might not have that. We can only get a 403 if
// security is configured on this cluster. Therefore, we call the has privilege API that is guaranteed to work
//for every user.
if info.StatusCode == 403 {
priv, err := client.Security.HasPrivileges(strings.NewReader(fmt.Sprintf(hasPrivilegesBody, config.IndexName.String)))
if err != nil {
return nil, err
}
if priv.StatusCode != 200 {
return nil, fmt.Errorf("cannot connect to Elasticsearch (status code %d)", priv.StatusCode)
}
} else {
return nil, fmt.Errorf("cannot connect to Elasticsearch (status code %d)", info.StatusCode)
}
}
bulkIndexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: config.IndexName.String,
Client: client,
OnError: func(ctx context.Context, err error) {
// this happens usually due to permission issues
params.Logger.Errorf("Could not write metrics: %s", err)
},
})
if err != nil {
return nil, fmt.Errorf("error creating the indexer: %v", err)
}
return &Output{
client: client,
bulkIndexer: bulkIndexer,
config: config,
logger: params.Logger,
}, nil
}
func (*Output) Description() string {
return "Output k6 metrics to Elasticsearch"
}
func (o *Output) Start() error {
indexName := o.config.IndexName.String
res, err := o.client.Indices.Create(indexName, o.client.Indices.Create.WithBody(bytes.NewReader(mapping)))
if err != nil {
return err
}
// 400 usually happens when the index already exists, which is ok for our purposes.
if res.StatusCode > 400 {
body, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("could not read response after failure to create index %s: %v", indexName, err)
}
return fmt.Errorf("could not create index %s: %s", indexName, body)
}
res.Body.Close()
if periodicFlusher, err := output.NewPeriodicFlusher(time.Duration(o.config.FlushPeriod.Duration), o.flush); err != nil {
return err
} else {
o.periodicFlusher = periodicFlusher
}
o.logger.Debugf("Elasticsearch: starting writing to index %s", indexName)
return nil
}
func (o *Output) Stop() error {
o.logger.Debug("Elasticsearch: stopping writing")
o.periodicFlusher.Stop()
if err := o.bulkIndexer.Close(context.Background()); err != nil {
log.Fatalf("Elasticsearch: Could not close bulk indexer: %s", err)
}
return nil
}
func (o *Output) blkItemErrHandler(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
o.logger.Errorf("%s", err)
} else {
o.logger.Errorf("%s: %s", res.Error.Type, res.Error.Reason)
}
}
func (o *Output) flush() {
samplesContainers := o.GetBufferedSamples()
for _, samplesContainer := range samplesContainers {
samples := samplesContainer.GetSamples()
for _, sample := range samples {
mappedEntry := elasticMetricEntry{
MetricName: sample.Metric.Name,
MetricType: sample.Metric.Type.String(),
Value: sample.Value,
Tags: sample.GetTags().Map(),
Time: sample.Time,
}
data, err := json.Marshal(mappedEntry)
if err != nil {
o.logger.Fatalf("Cannot encode document: %s, %s", err, mappedEntry)
}
var item = esutil.BulkIndexerItem{
Action: "create",
Body: bytes.NewReader(data),
OnFailure: o.blkItemErrHandler,
}
err = o.bulkIndexer.Add(
context.Background(),
item,
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
}
}
}