kibana/client.go (334 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package kibana import ( "bytes" "context" "encoding/base64" "encoding/json" "errors" "fmt" "io" "mime" "mime/multipart" "net/http" "net/textproto" "net/url" "path" "strings" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/useragent" "github.com/elastic/elastic-agent-libs/version" ) const statusAPI = "/api/status" type Connection struct { URL string Username string Password string APIKey string ServiceToken string Headers http.Header HTTP *http.Client Version version.V } type Client struct { Connection log *logp.Logger } func addToURL(_url, _path string, params url.Values) string { if len(params) == 0 { return _url + _path } return strings.Join([]string{_url, _path, "?", params.Encode()}, "") } func extractError(result []byte) error { var kibanaResult struct { Message string Attributes struct { Objects []struct { ID string Error struct { Message string } } } } if err := json.Unmarshal(result, &kibanaResult); err != nil { return fmt.Errorf("error extracting JSON for error response: %w", err) } var errs []error if kibanaResult.Message != "" { for _, err := range kibanaResult.Attributes.Objects { errs = append(errs, fmt.Errorf("id: %s, message: %s", err.ID, err.Error.Message)) } if len(errs) == 0 { return fmt.Errorf("%s", kibanaResult.Message) } return fmt.Errorf("%s: %w", kibanaResult.Message, errors.Join(errs...)) } return nil } func extractMessage(result []byte) error { var kibanaResult struct { Success bool Errors []struct { ID string Type string Error struct { Type string References []struct { Type string ID string } } } } if err := json.Unmarshal(result, &kibanaResult); err != nil { return nil //nolint:nilerr // we suppress some malformed errors on purpose } if !kibanaResult.Success { var errs []error for _, err := range kibanaResult.Errors { errs = append(errs, fmt.Errorf("error: %s, asset ID=%s; asset type=%s; references=%+v", err.Error.Type, err.ID, err.Type, err.Error.References)) } return errors.Join(errs...) } return nil } // NewKibanaClient builds and returns a new Kibana client func NewKibanaClient(cfg *config.C, binaryName, version, commit, buildtime string) (*Client, error) { config := DefaultClientConfig() if err := cfg.Unpack(&config); err != nil { return nil, err } return NewClientWithConfig(&config, binaryName, version, commit, buildtime) } // NewClientWithConfig creates and returns a kibana client using the given config func NewClientWithConfig(config *ClientConfig, binaryName, version, commit, buildtime string) (*Client, error) { return NewClientWithConfigDefault(config, 5601, binaryName, version, commit, buildtime) } // NewClientWithConfigDefault creates and returns a kibana client using the given config func NewClientWithConfigDefault(config *ClientConfig, defaultPort int, binaryName, version, commit, buildtime string) (*Client, error) { if err := config.Validate(); err != nil { return nil, err } p := config.Path if config.SpaceID != "" { p = path.Join(p, "s", config.SpaceID) } kibanaURL, err := MakeURL(config.Protocol, p, config.Host, defaultPort) if err != nil { return nil, fmt.Errorf("invalid Kibana host: %w", err) } u, err := url.Parse(kibanaURL) if err != nil { return nil, fmt.Errorf("failed to parse the Kibana URL: %w", err) } username := config.Username password := config.Password if u.User != nil { username = u.User.Username() password, _ = u.User.Password() u.User = nil if config.APIKey != "" && (username != "" || password != "") { return nil, fmt.Errorf("cannot set api_key with username/password in Kibana URL") } // Re-write URL without credentials. kibanaURL = u.String() } log := logp.NewLogger("kibana") log.Infof("Kibana url: %s", kibanaURL) headers := make(http.Header) for k, v := range config.Headers { headers.Set(k, v) } if binaryName == "" { binaryName = "Libbeat" } userAgent := useragent.UserAgent(binaryName, version, commit, buildtime) rt, err := config.Transport.Client(httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent})) if err != nil { return nil, err } client := &Client{ Connection: Connection{ URL: kibanaURL, Username: username, Password: password, APIKey: config.APIKey, ServiceToken: config.ServiceToken, Headers: headers, HTTP: rt, }, log: log, } if !config.IgnoreVersion { if err = client.readVersion(); err != nil { return nil, fmt.Errorf("fail to get the Kibana version: %w", err) } } return client, nil } func (conn *Connection) Request(method, extraPath string, params url.Values, headers http.Header, body io.Reader) (int, []byte, error) { resp, err := conn.Send(method, extraPath, params, headers, body) if err != nil { return 0, nil, fmt.Errorf("fail to execute the HTTP %s request: %w", method, err) } defer resp.Body.Close() result, err := io.ReadAll(resp.Body) if err != nil { return 0, nil, fmt.Errorf("fail to read response: %w", err) } var retError error if resp.StatusCode >= 300 { retError = extractError(result) } else { retError = extractMessage(result) } return resp.StatusCode, result, retError } // Send an application/json request to Kibana with appropriate kbn headers func (conn *Connection) Send(method, extraPath string, params url.Values, headers http.Header, body io.Reader) (*http.Response, error) { return conn.SendWithContext(context.Background(), method, extraPath, params, headers, body) } // SendWithContext sends an application/json request to Kibana with appropriate kbn headers and the given context. func (conn *Connection) SendWithContext(ctx context.Context, method, extraPath string, params url.Values, headers http.Header, body io.Reader) (*http.Response, error) { reqURL := addToURL(conn.URL, extraPath, params) req, err := http.NewRequestWithContext(ctx, method, reqURL, body) if err != nil { return nil, fmt.Errorf("fail to create the HTTP %s request: %w", method, err) } if conn.Username != "" || conn.Password != "" { req.SetBasicAuth(conn.Username, conn.Password) } if conn.APIKey != "" { v := "ApiKey " + base64.StdEncoding.EncodeToString([]byte(conn.APIKey)) req.Header.Set("Authorization", v) } if conn.ServiceToken != "" { v := "Bearer " + conn.ServiceToken req.Header.Set("Authorization", v) } addHeaders(req.Header, conn.Headers) addHeaders(req.Header, headers) contentType := req.Header.Get("Content-Type") contentType, _, _ = mime.ParseMediaType(contentType) if contentType != "multipart/form-data" && contentType != "application/ndjson" { req.Header.Set("Content-Type", "application/json") } req.Header.Set("Accept", "application/json") req.Header.Set("kbn-xsrf", "1") return conn.RoundTrip(req) } func addHeaders(out, in http.Header) { for k, vs := range in { for _, v := range vs { out.Add(k, v) } } } // Implements RoundTrip interface func (conn *Connection) RoundTrip(r *http.Request) (*http.Response, error) { return conn.HTTP.Do(r) } func (client *Client) readVersion() error { type kibanaVersionResponse struct { Name string `json:"name"` Version struct { Number string `json:"number"` Snapshot bool `json:"build_snapshot"` } `json:"version"` } code, result, err := client.Connection.Request("GET", statusAPI, nil, nil, nil) if err != nil { return fmt.Errorf("HTTP GET request to %s/api/status fails: %w (status=%d). Response: %s", client.Connection.URL, err, code, truncateString(result)) } if code >= 400 { return fmt.Errorf("HTTP GET request to %s/api/status fails: status=%d. Response: %s", client.Connection.URL, code, truncateString(result)) } var versionString string var kibanaVersion kibanaVersionResponse err = json.Unmarshal(result, &kibanaVersion) if err != nil { return fmt.Errorf("fail to unmarshal the response from GET %s/api/status. Response: %s. Kibana status api returns: %w", client.Connection.URL, truncateString(result), err) } versionString = kibanaVersion.Version.Number if kibanaVersion.Version.Snapshot { // needed for the tests versionString += "-SNAPSHOT" } version, err := version.New(versionString) if err != nil { return fmt.Errorf("fail to parse kibana version (%v): %w", versionString, err) } client.Version = *version return nil } // GetVersion returns the version read from kibana. The version is not set if // IgnoreVersion was set when creating the client. func (client *Client) GetVersion() version.V { return client.Version } // KibanaIsServerless returns true if we're talking to a serverless instance. func (client *Client) KibanaIsServerless() (bool, error) { type apiStatus struct { Version struct { BuildFlavor string `json:"build_flavor"` } `json:"version"` } // we can send a GET to `/api/status` without auth, but it won't actually return version info. params := http.Header{} if client.APIKey != "" { v := "ApiKey " + base64.StdEncoding.EncodeToString([]byte(client.APIKey)) params.Add("Authorization", v) } ret, resp, err := client.Connection.Request("GET", "/api/status", nil, params, nil) if err != nil { return false, fmt.Errorf("error in HTTP request: %w", err) } respString := string(resp) if ret > http.StatusMultipleChoices { return false, fmt.Errorf("got invalid response code: %v (%s)", ret, respString) } status := apiStatus{} err = json.Unmarshal(resp, &status) if err != nil { return false, fmt.Errorf("error unmarshalling JSON: %w", err) } if status.Version.BuildFlavor == "serverless" { return true, nil } else { return false, nil } } func (client *Client) ImportMultiPartFormFile(url string, params url.Values, filename string, contents string) error { buf := &bytes.Buffer{} w := multipart.NewWriter(buf) pHeaders := textproto.MIMEHeader{} pHeaders.Add("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, filename)) pHeaders.Add("Content-Type", "application/ndjson") p, err := w.CreatePart(pHeaders) if err != nil { return fmt.Errorf("failed to create multipart writer for payload: %w", err) } _, err = io.Copy(p, strings.NewReader(contents)) if err != nil { return fmt.Errorf("failed to copy contents of the object: %w", err) } w.Close() // On serverless, special header is required to talk to this endpoint sendHeaders := http.Header{} sendHeaders.Add("Content-Type", w.FormDataContentType()) if serverless, _ := client.KibanaIsServerless(); serverless { sendHeaders.Add("x-elastic-internal-origin", "elastic-agent-libs") } statusCode, response, err := client.Connection.Request("POST", url, params, sendHeaders, buf) if err != nil { return fmt.Errorf("returned %d to import file: %w. Response: %s", statusCode, err, response) } if statusCode >= 300 { return fmt.Errorf("returned %d to import file. Response: %s", statusCode, response) } client.log.Debugf("Imported multipart file to %s with params %v", url, params) return nil } func (client *Client) Close() error { return nil } // truncateString returns a truncated string if the length is greater than 250 // runes. If the string is truncated "... (truncated)" is appended. Newlines are // replaced by spaces in the returned string. // // This function is useful for logging raw HTTP responses with errors when those // responses can be very large (such as an HTML page with CSS content). func truncateString(b []byte) string { const maxLength = 250 runes := bytes.Runes(b) if len(runes) > maxLength { runes = append(runes[:maxLength], []rune("... (truncated)")...) } return strings.Replace(string(runes), "\n", " ", -1) }