func New()

in pkg/esoutput/esoutput.go [84:181]


func New(params output.Params) (output.Output, error) {
	params.Logger.Info("Elasticsearch: configuring output")

	config, err := GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
	if err != nil {
		return nil, err
	}

	var addresses = []string{config.Url.ValueOrZero()}

	var esConfig es.Config

	// Cloud id takes precedence over a URL (which is localhost by default)
	if config.CloudID.Valid {
		esConfig.CloudID = config.CloudID.String
	} else if config.Url.Valid {
		esConfig.Addresses = strings.Split(strings.Join(addresses, ""), ",")
	}
	if config.User.Valid {
		esConfig.Username = config.User.String
	}
	if config.Password.Valid {
		esConfig.Password = config.Password.String
	}
	if config.APIKey.Valid {
		esConfig.APIKey = config.APIKey.String
	}
	if config.ServiceAccountToken.Valid {
		esConfig.ServiceToken = config.ServiceAccountToken.String
	}
	if config.CACert.Valid {
		cert, err := os.ReadFile(config.CACert.String)
		if err != nil {
			return nil, err
		}
		esConfig.CACert = cert
	}

	var clientTLSCert tls.Certificate
	if config.ClientCert.Valid && config.ClientKey.Valid {
		clientTLSCert, err = tls.LoadX509KeyPair(config.ClientCert.String, config.ClientKey.String)
		if err != nil {
			return nil, err
		}
	}

	esConfig.Transport = &http.Transport{
		TLSClientConfig: &tls.Config{
			InsecureSkipVerify: config.InsecureSkipVerify.Bool,
			Certificates:       []tls.Certificate{clientTLSCert},
		},
	}

	client, err := es.NewClient(esConfig)
	if err != nil {
		return nil, err
	}
	// ensure basic connectivity
	info, err := client.Info()
	if err != nil {
		return nil, err
	}
	if info.StatusCode != 200 {
		// The info API requires the 'monitor' privilege and the user might not have that. We can only get a 403 if
		// security is configured on this cluster. Therefore, we call the has privilege API that is guaranteed to work
		//for every user.
		if info.StatusCode == 403 {
			priv, err := client.Security.HasPrivileges(strings.NewReader(fmt.Sprintf(hasPrivilegesBody, config.IndexName.String)))
			if err != nil {
				return nil, err
			}
			if priv.StatusCode != 200 {
				return nil, fmt.Errorf("cannot connect to Elasticsearch (status code %d)", priv.StatusCode)
			}
		} else {
			return nil, fmt.Errorf("cannot connect to Elasticsearch (status code %d)", info.StatusCode)
		}
	}

	bulkIndexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:  config.IndexName.String,
		Client: client,
		OnError: func(ctx context.Context, err error) {
			// this happens usually due to permission issues
			params.Logger.Errorf("Could not write metrics: %s", err)
		},
	})
	if err != nil {
		return nil, fmt.Errorf("error creating the indexer: %v", err)
	}

	return &Output{
		client:      client,
		bulkIndexer: bulkIndexer,
		config:      config,
		logger:      params.Logger,
	}, nil
}