functionaltests/internal/kbclient/client.go (189 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package kbclient implements a Kibana HTTP API client to perform operation needed by functional tests.
package kbclient
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/elastic/apm-server/functionaltests/internal/ecclient"
)
func New(kibanaURL, username, password string) (*Client, error) {
if kibanaURL == "" {
return nil, fmt.Errorf("kbclient.New kibanaURL must not be empty")
}
return &Client{
url: kibanaURL,
superUsername: username,
superPassword: password,
SupportedAPIVersion: "2023-10-31",
}, nil
}
// Client is a wrapped HTTP Client with custom Kibana related methods.
type Client struct {
http.Client
// url is the Kibana URL where requests will be directed to.
url string
// Fleet API access require superuser role before 8.1.0.
// superUsername should be an Elasticsearch superuser username.
superUsername string
// superPassword should be an Elasticsearch superuser password.
superPassword string
SupportedAPIVersion string
}
// prepareRequest creates a http.Request with required headers for interacting with Kibana.
func (c *Client) prepareRequest(method, path string, body any) (*http.Request, error) {
b, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("cannot marshal body: %w", err)
}
url := fmt.Sprintf("%s%s", c.url, path)
req, err := http.NewRequest(method, url, bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("cannot create http request: %w", err)
}
req.Header.Add("kbn-xsrf", "true")
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Elastic-Api-Version", c.SupportedAPIVersion)
req.Header.Add("X-Elastic-Internal-Origin", "Kibana")
userPass := fmt.Sprintf("%s:%s", c.superUsername, c.superPassword)
basicAuth := base64.StdEncoding.EncodeToString([]byte(userPass))
req.Header.Add("Authorization", fmt.Sprintf("Basic %s", basicAuth))
return req, nil
}
// sendRequest sends a http.Request with the provided method, path and body to Kibana
// and returns the response body bytes if the status code is 200.
// It will also handle non-200 status codes accordingly, if the handler is defined.
func (c *Client) sendRequest(
ctx context.Context,
method string,
path string,
body any,
handleRespError func(statusCode int, body []byte) error,
) ([]byte, error) {
methodPath := fmt.Sprintf("%s %s", method, path)
req, err := c.prepareRequest(method, path, body)
if err != nil {
return nil, fmt.Errorf("cannot prepare request (%s): %w", methodPath, err)
}
req = req.WithContext(ctx)
resp, err := c.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot perform http request (%s): %w", methodPath, err)
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("cannot read response body (%s): %w", methodPath, err)
}
if resp.StatusCode != 200 {
if handleRespError != nil {
if err = handleRespError(resp.StatusCode, b); err != nil {
return nil, err
}
}
errMsg := fmt.Sprintf("request (%s) failed with status code %d", methodPath, resp.StatusCode)
if len(b) > 0 {
return nil, fmt.Errorf("%s, body: %s", errMsg, string(b))
}
return nil, errors.New(errMsg)
}
return b, nil
}
type ElasticAgentPolicyNotFoundError struct {
Name string
}
func (e ElasticAgentPolicyNotFoundError) Error() string {
return fmt.Sprintf("ElasticAgentPolicy named %s was not found", e.Name)
}
type PackagePolicy struct {
Name string `json:"name"`
Description string `json:"description"`
Package PackagePolicyPkg `json:"package"`
Namespace string `json:"namespace"`
Inputs []PackagePolicyInput `json:"inputs"`
OutputID string `json:"output_id"`
PolicyID string `json:"policy_id,omitempty"`
PolicyIDs []string `json:"policy_ids,omitempty"`
Enabled bool `json:"enabled"`
}
type PackagePolicyPkg struct {
Name string `json:"name"`
Title string `json:"title"`
Version string `json:"version"`
}
type PackagePolicyInput struct {
Config any `json:"config"`
Enabled bool `json:"enabled"`
Streams []any `json:"streams"`
Type string `json:"type"`
Vars map[string]any `json:"vars"`
KeepEnabled bool `json:"keep_enabled"`
PolicyTemplate string `json:"policy_template"`
}
type getPackagePolicyResponse struct {
Item PackagePolicy `json:"item"`
}
// GetPackagePolicyByID retrieves the Package Policy specified by policyID.
// https://www.elastic.co/docs/api/doc/kibana/v8/operation/operation-get-package-policy
func (c *Client) GetPackagePolicyByID(ctx context.Context, policyID string) (PackagePolicy, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
path := fmt.Sprintf("/api/fleet/package_policies/%s", policyID)
handleRespError := func(statusCode int, _ []byte) error {
if statusCode == 404 {
return &ElasticAgentPolicyNotFoundError{Name: policyID}
}
return nil
}
b, err := c.sendRequest(ctx, http.MethodGet, path, nil, handleRespError)
if err != nil {
return PackagePolicy{}, err
}
var policyResp getPackagePolicyResponse
if err = json.Unmarshal(b, &policyResp); err != nil {
return PackagePolicy{}, fmt.Errorf("cannot unmarshal response body: %w", err)
}
return policyResp.Item, nil
}
// UpdatePackagePolicyByID performs a Package Policy update in Fleet through the Fleet Kibana APIs.
// https://www.elastic.co/docs/api/doc/kibana/v8/operation/operation-update-package-policy
func (c *Client) UpdatePackagePolicyByID(ctx context.Context, policyID string, policy PackagePolicy) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
path := fmt.Sprintf("/api/fleet/package_policies/%s", policyID)
_, err := c.sendRequest(ctx, http.MethodPut, path, policy, nil)
return err
}
// UpdatePackagePolicyDescriptionByID updates a Package Policy description through the Fleet Kibana APIs.
func (c *Client) UpdatePackagePolicyDescriptionByID(
ctx context.Context,
policyID string,
version ecclient.StackVersion,
description string,
) error {
policy, err := c.GetPackagePolicyByID(ctx, policyID)
if err != nil {
return fmt.Errorf("cannot get elastic-cloud-apm package policy: %w", err)
}
// If the package policy version returned from API does not match with
// expected version, set it ourselves to hopefully circumvent it.
// Relevant issue: https://github.com/elastic/kibana/issues/215437.
//
// NOTE: We check that the version in the package policy has the same
// Major.Minor version as we expect (instead of whole version string),
// because 7.17.x somehow has package policy version of 7.17.0.
if !strings.HasPrefix(policy.Package.Version, version.MajorMinor()) {
// Set the expected version for this update.
policy.Package.Version = version.String()
}
policy.Description = description
if err = c.UpdatePackagePolicyByID(ctx, policyID, policy); err != nil {
return fmt.Errorf("cannot update elastic-cloud-apm package policy: %w", err)
}
return nil
}
type enableIntegrationsResponse struct {
CloudApmPackagePolicy struct {
Enabled bool `json:"enabled"`
} `json:"cloudApmPackagePolicy"`
}
// EnableIntegrationsServer enables integrations server to add combined APM and Fleet Server to the deployment.
// https://www.elastic.co/guide/en/cloud/current/ec-manage-integrations-server.html
func (c *Client) EnableIntegrationsServer(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// This is an internal API that is not publicly documented.
// https://github.com/elastic/kibana/blob/12aa3fc/x-pack/solutions/observability/plugins/apm/server/routes/fleet/route.ts#L146
path := "/internal/apm/fleet/cloud_apm_package_policy"
b, err := c.sendRequest(ctx, http.MethodPost, path, nil, nil)
if err != nil {
return err
}
var resp enableIntegrationsResponse
if err = json.Unmarshal(b, &resp); err != nil {
return fmt.Errorf("cannot unmarshal response body: %w", err)
}
if !resp.CloudApmPackagePolicy.Enabled {
return errors.New("failed to enable integrations server")
}
return nil
}