libbeat/outputs/elasticsearch/client.go (394 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package elasticsearch import ( "context" "errors" "fmt" "net/http" "strings" "time" "go.elastic.co/apm/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/periodic" "github.com/elastic/elastic-agent-libs/testing" "github.com/elastic/elastic-agent-libs/version" ) var ( errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped") ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true") //nolint:staticcheck //false positive ) // Client is an elasticsearch client. type Client struct { conn eslegclient.Connection indexSelector outputs.IndexSelector pipelineSelector *outil.Selector observer outputs.Observer // If deadLetterIndex is set, events with bulk-ingest errors will be // forwarded to this index. Otherwise, they will be dropped. deadLetterIndex string log *logp.Logger pLogIndex *periodic.Doer pLogIndexTryDeadLetter *periodic.Doer pLogDeadLetter *periodic.Doer } // clientSettings contains the settings for a client. type clientSettings struct { connection eslegclient.ConnectionSettings indexSelector outputs.IndexSelector pipelineSelector *outil.Selector // The metrics observer from the clientSettings, or a no-op placeholder if // none is provided. This variable is always non-nil for a client created // via NewClient. observer outputs.Observer // If deadLetterIndex is set, events with bulk-ingest errors will be // forwarded to this index. Otherwise, they will be dropped. deadLetterIndex string } type bulkResultStats struct { acked int // number of events ACKed by Elasticsearch duplicates int // number of events failed with `create` due to ID already being indexed fails int // number of events with retryable failures. nonIndexable int // number of events with permanent failures. deadLetter int // number of failed events ingested to the dead letter index. tooMany int // number of events receiving HTTP 429 Too Many Requests } type bulkResult struct { // A connection-level error if the request couldn't be sent or the response // couldn't be read. This error is returned from (*Client).Publish to signal // to the pipeline that this output worker needs to be reconnected before the // next Publish call. connErr error // The array of events sent via bulk request. This excludes any events that // had encoding errors while assembling the request. events []publisher.Event // The http status returned by the bulk request. status int // The API response from Elasticsearch. response eslegclient.BulkResponse } const ( defaultEventType = "doc" ) // Flags passed with the Bulk API request: we filter the response to include // only the fields we need for checking request/item state. var bulkRequestParams = map[string]string{ "filter_path": "errors,items.*.error,items.*.status", } // NewClient instantiates a new client. func NewClient( s clientSettings, onConnect *callbacksRegistry, logger *logp.Logger, ) (*Client, error) { pipeline := s.pipelineSelector if pipeline != nil && pipeline.IsEmpty() { pipeline = nil } conn, err := eslegclient.NewConnection(s.connection) if err != nil { return nil, err } conn.OnConnectCallback = func(conn *eslegclient.Connection) error { globalCallbackRegistry.mutex.Lock() defer globalCallbackRegistry.mutex.Unlock() for _, callback := range globalCallbackRegistry.callbacks { err := callback(conn) if err != nil { return err } } if onConnect != nil { onConnect.mutex.Lock() defer onConnect.mutex.Unlock() for _, callback := range onConnect.callbacks { err := callback(conn) if err != nil { return err } } } return nil } // Make sure there's a non-nil observer observer := s.observer if observer == nil { observer = outputs.NewNilObserver() } log := logger.Named("elasticsearch") pLogDeadLetter := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) { log.Errorf( "Failed to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d) }) pLogIndex := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) { log.Warnf( "Failed to index %d events in last %s: events were dropped! Look at the event log to view the event and cause.", count, d) }) pLogIndexTryDeadLetter := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) { log.Warnf( "Failed to index %d events in last %s: tried dead letter index. Look at the event log to view the event and cause.", count, d) }) pLogDeadLetter.Start() pLogIndex.Start() pLogIndexTryDeadLetter.Start() client := &Client{ conn: *conn, indexSelector: s.indexSelector, pipelineSelector: pipeline, observer: observer, deadLetterIndex: s.deadLetterIndex, log: log, pLogDeadLetter: pLogDeadLetter, pLogIndex: pLogIndex, pLogIndexTryDeadLetter: pLogIndexTryDeadLetter, } return client, nil } // Clone clones a client. func (client *Client) Clone() *Client { // when cloning the connection callback and params are not copied. A // client's close is for example generated for topology-map support. With params // most likely containing the ingest node pipeline and default callback trying to // create install a template, we don't want these to be included in the clone. connection := eslegclient.ConnectionSettings{ URL: client.conn.URL, Beatname: client.conn.Beatname, Kerberos: client.conn.Kerberos, Username: client.conn.Username, Password: client.conn.Password, APIKey: client.conn.APIKey, Parameters: nil, // XXX: do not pass params? Headers: client.conn.Headers, CompressionLevel: client.conn.CompressionLevel, OnConnectCallback: nil, Observer: nil, EscapeHTML: false, Transport: client.conn.Transport, } // Without the following nil check on proxyURL, a nil Proxy field will try // reloading proxy settings from the environment instead of leaving them // empty. client.conn.Transport.Proxy.Disable = client.conn.Transport.Proxy.URL == nil c, _ := NewClient( clientSettings{ connection: connection, indexSelector: client.indexSelector, pipelineSelector: client.pipelineSelector, deadLetterIndex: client.deadLetterIndex, }, nil, // XXX: do not pass connection callback? client.log, ) return c } func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() span.Context.SetLabel("events_original", len(batch.Events())) client.observer.NewBatch(len(batch.Events())) // Create and send the bulk request. bulkResult := client.doBulkRequest(ctx, batch) span.Context.SetLabel("events_encoded", len(bulkResult.events)) if bulkResult.connErr != nil { // If there was a connection-level error there is no per-item response, // handle it and return. return client.handleBulkResultError(ctx, batch, bulkResult) } span.Context.SetLabel("events_published", len(bulkResult.events)) // At this point we have an Elasticsearch response for our request, // check and report the per-item results. eventsToRetry, stats := client.bulkCollectPublishFails(bulkResult) stats.reportToObserver(client.observer) if len(eventsToRetry) > 0 { span.Context.SetLabel("events_failed", len(eventsToRetry)) batch.RetryEvents(eventsToRetry) } else { batch.ACK() } return nil } // Encode a batch's events into a bulk publish request, send the request to // Elasticsearch, and return the resulting metadata. // Reports the network request latency to the client's metrics observer. // The events list in the result will be shorter than the original batch if // some events couldn't be encoded. In this case, the removed events will // be reported to the Client's metrics observer via PermanentErrors. func (client *Client) doBulkRequest( ctx context.Context, batch publisher.Batch, ) bulkResult { var result bulkResult rawEvents := batch.Events() // encode events into bulk request buffer, dropping failed elements from // events slice resultEvents, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), rawEvents) result.events = resultEvents client.observer.PermanentErrors(len(rawEvents) - len(resultEvents)) // If we encoded any events, send the network request. if len(result.events) > 0 { begin := time.Now() result.status, result.response, result.connErr = client.conn.Bulk(ctx, "", "", bulkRequestParams, bulkItems) if result.connErr == nil { duration := time.Since(begin) client.observer.ReportLatency(duration) client.log.Debugf( "doBulkRequest: %d events have been sent to elasticsearch in %v.", len(result.events), duration) } } return result } func (client *Client) handleBulkResultError( ctx context.Context, batch publisher.Batch, bulkResult bulkResult, ) error { if bulkResult.status == http.StatusRequestEntityTooLarge { if batch.SplitRetry() { // Report that we split a batch client.observer.BatchSplit() client.observer.RetryableErrors(len(bulkResult.events)) } else { // If the batch could not be split, there is no option left but // to drop it and log the error state. batch.Drop() client.observer.PermanentErrors(len(bulkResult.events)) client.log.Error(errPayloadTooLarge) } // Don't propagate a too-large error since it doesn't indicate a problem // with the connection. return nil } err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", bulkResult.connErr)) err.Send() client.log.Error(err) if len(bulkResult.events) > 0 { // At least some events failed, retry them batch.RetryEvents(bulkResult.events) } else { // All events were sent successfully batch.ACK() } client.observer.RetryableErrors(len(bulkResult.events)) return bulkResult.connErr } // bulkEncodePublishRequest encodes all bulk requests and returns slice of events // successfully added to the list of bulk items and the list of bulk items. func (client *Client) bulkEncodePublishRequest(version version.V, data []publisher.Event) ([]publisher.Event, []interface{}) { okEvents := data[:0] bulkItems := []interface{}{} for i := range data { if data[i].EncodedEvent == nil { client.log.Error("Elasticsearch output received unencoded publisher.Event") continue } event := data[i].EncodedEvent.(*encodedEvent) //nolint:errcheck //safe to ignore type check if event.err != nil { // This means there was an error when encoding the event and it isn't // ingestable, so report the error and continue. client.log.Error(event.err) continue } meta, err := client.createEventBulkMeta(version, event) if err != nil { client.log.Errorf("Failed to encode event meta data: %+v", err) continue } if event.opType == events.OpTypeDelete { // We don't include the event source in a bulk DELETE bulkItems = append(bulkItems, meta) } else { // Wrap the encoded event in a RawEncoding so the Elasticsearch client // knows not to re-encode it bulkItems = append(bulkItems, meta, eslegclient.RawEncoding{Encoding: event.encoding}) } okEvents = append(okEvents, data[i]) } return okEvents, bulkItems } func (client *Client) createEventBulkMeta(version version.V, event *encodedEvent) (interface{}, error) { eventType := "" if version.Major < 7 { eventType = defaultEventType } meta := eslegclient.BulkMeta{ Index: event.index, DocType: eventType, Pipeline: event.pipeline, ID: event.id, } if event.opType == events.OpTypeDelete { if event.id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil } else { return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.OpTypeDelete) } } if event.id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { if event.opType == events.OpTypeIndex { return eslegclient.BulkIndexAction{Index: meta}, nil } return eslegclient.BulkCreateAction{Create: meta}, nil } return eslegclient.BulkIndexAction{Index: meta}, nil } func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) if errors.Is(err, mapstr.ErrKeyNotFound) { return "", nil } if err != nil { return "", errors.New("pipeline metadata is no string") } return strings.ToLower(pipeline), nil } if defaultSelector != nil { return defaultSelector.Select(event) } return "", nil } // bulkCollectPublishFails checks per item errors returning all events // to be tried again due to error code returned for that items. If indexing an // event failed due to some error in the event itself (e.g. does not respect mapping), // the event will be dropped. // Each of the events will be reported in the returned stats as exactly one of // acked, duplicates, fails, nonIndexable, or deadLetter. func (client *Client) bulkCollectPublishFails(bulkResult bulkResult) ([]publisher.Event, bulkResultStats) { events := bulkResult.events if len(bulkResult.events) == 0 { // No events to process return nil, bulkResultStats{} } if bulkResult.status != 200 { return events, bulkResultStats{fails: len(events)} } reader := newJSONReader(bulkResult.response) if err := bulkReadToItems(reader); err != nil { client.log.Errorf("failed to parse bulk response: %v", err.Error()) return events, bulkResultStats{fails: len(events)} } count := len(events) eventsToRetry := events[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { itemStatus, itemMessage, err := bulkReadItemStatus(client.log, reader) if err != nil { // The response json is invalid, mark the remaining events for retry. stats.fails += count - i eventsToRetry = append(eventsToRetry, events[i:]...) break } if client.applyItemStatus(events[i], itemStatus, itemMessage, &stats) { eventsToRetry = append(eventsToRetry, events[i]) client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, itemStatus, itemMessage) } } return eventsToRetry, stats } // applyItemStatus processes the ingestion status of one event from a bulk request. // Returns true if the item should be retried. // In the provided bulkResultStats, applyItemStatus increments exactly one of: // acked, duplicates, deadLetter, fails, nonIndexable. func (client *Client) applyItemStatus( event publisher.Event, itemStatus int, itemMessage []byte, stats *bulkResultStats, ) bool { encodedEvent := event.EncodedEvent.(*encodedEvent) //nolint:errcheck //safe to ignore type check if itemStatus < 300 { if encodedEvent.deadLetter { // This was ingested into the dead letter index, not the original target stats.deadLetter++ } else { stats.acked++ } return false // no retry needed } if itemStatus == 409 { // 409 is used to indicate there is already an event with the same ID, or // with identical Time Series Data Stream dimensions when TSDS is active. stats.duplicates++ return false // no retry needed } if itemStatus == http.StatusTooManyRequests { stats.fails++ stats.tooMany++ return true } if itemStatus < 500 { // hard failure, apply policy action if encodedEvent.deadLetter { // Fatal error while sending an already-failed event to the dead letter // index, drop. client.pLogDeadLetter.Add() client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event '%s' (status=%v): %s", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType) stats.nonIndexable++ return false } if client.deadLetterIndex == "" { // Fatal error and no dead letter index, drop. client.pLogIndex.Add() client.log.Warnw(fmt.Sprintf("Cannot index event '%s' (status=%v): %s, dropping event!", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType) stats.nonIndexable++ return false } // Send this failure to the dead letter index and "retry". // We count this as a "retryable failure", and then if the dead letter // ingestion succeeds it is counted in the "deadLetter" counter // rather than the "acked" counter. client.pLogIndexTryDeadLetter.Add() client.log.Warnw(fmt.Sprintf("Cannot index event '%s' (status=%v): %s, trying dead letter index", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType) encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage)) } // Everything else gets retried. stats.fails++ return true } func (client *Client) Connect(ctx context.Context) error { return client.conn.Connect(ctx) } func (client *Client) Close() error { return client.conn.Close() } func (client *Client) String() string { return "elasticsearch(" + client.conn.URL + ")" } func (client *Client) Test(d testing.Driver) { client.conn.Test(d) } func (stats bulkResultStats) reportToObserver(ob outputs.Observer) { ob.AckedEvents(stats.acked) ob.RetryableErrors(stats.fails) ob.PermanentErrors(stats.nonIndexable) ob.DuplicateEvents(stats.duplicates) ob.DeadLetterEvents(stats.deadLetter) ob.ErrTooMany(stats.tooMany) }