functionaltests/internal/ecclient/client.go (208 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package ecclient
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"slices"
"strings"
"time"
"github.com/elastic/cloud-sdk-go/pkg/api"
"github.com/elastic/cloud-sdk-go/pkg/api/deploymentapi"
"github.com/elastic/cloud-sdk-go/pkg/auth"
"github.com/elastic/cloud-sdk-go/pkg/client/deployments"
"github.com/elastic/cloud-sdk-go/pkg/client/stack"
"github.com/elastic/cloud-sdk-go/pkg/models"
)
type Client struct {
ecAPI *api.API
endpoint string
}
type clientConfig struct {
httpClient *http.Client
}
func (o *clientConfig) initDefaults() {
o.httpClient = new(http.Client)
}
type ClientOption func(*clientConfig)
func WithHTTPClient(httpClient *http.Client) ClientOption {
return func(o *clientConfig) {
o.httpClient = httpClient
}
}
func New(endpoint string, apiKey string, options ...ClientOption) (*Client, error) {
cfg := clientConfig{}
cfg.initDefaults()
for _, o := range options {
o(&cfg)
}
if apiKey == "" {
return nil, fmt.Errorf("ecclient.New apiKey is required")
}
if endpoint == "" {
return nil, fmt.Errorf("ecclient.New endpoint is required")
}
ecAPI, err := api.NewAPI(api.Config{
AuthWriter: auth.APIKey(apiKey),
Client: cfg.httpClient,
Host: endpoint,
})
if err != nil {
return nil, fmt.Errorf("cannot create Elastic Cloud API client: %w", err)
}
return &Client{
ecAPI: ecAPI,
endpoint: endpoint,
}, nil
}
func (c *Client) RestartIntegrationServer(ctx context.Context, deploymentID string) error {
res, err := deploymentapi.Get(deploymentapi.GetParams{
API: c.ecAPI,
DeploymentID: deploymentID,
})
if err != nil {
return fmt.Errorf("cannot retrieve ref id of integrations server for deployment %s: %w", deploymentID, err)
}
refID := *res.Resources.IntegrationsServer[0].RefID
// https://www.elastic.co/docs/api/doc/cloud/operation/operation-restart-deployment-stateless-resource
url := fmt.Sprintf("%s/api/v1/deployments/%s/integrations_server/%s/_restart", c.endpoint, deploymentID, refID)
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return fmt.Errorf("cannot create integrations server restart request for deployment %s: %w", deploymentID, err)
}
req = c.ecAPI.AuthWriter.AuthRequest(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("cannot execute HTTP request for restarting deployment %s: %w", deploymentID, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("cannot read body after receiving a %d response while restarting integrations server: %w", resp.StatusCode, err)
}
return fmt.Errorf("restarting integrations server returned %d response with content: %s", resp.StatusCode, b)
}
// Wait until the integration server is back online.
status := func() (string, error) {
r, err := c.ecAPI.V1API.Deployments.GetDeploymentIntegrationsServerResourceInfo(
deployments.NewGetDeploymentIntegrationsServerResourceInfoParams().
WithDeploymentID(deploymentID).
WithRefID(refID),
c.ecAPI.AuthWriter)
if err != nil {
return "", err
}
return *r.Payload.Info.Status, nil
}
timeout := 10 * time.Minute
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
tctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
select {
case <-tctx.Done():
return fmt.Errorf("timeout reached waiting for integrations server to restart")
case <-ticker.C:
s, err := status()
if err != nil {
return fmt.Errorf("cannot retrieve integrations server status: %w", err)
}
if s == "started" {
return nil
}
}
}
}
func (c *Client) getVersionInfos(
ctx context.Context,
region string,
showUnusable bool,
preFilter func(*models.StackVersionConfig) bool, // Filter before conversion
postFilter func(StackVersion) bool, // Filter after conversion
) (StackVersionInfos, error) {
showDeleted := false
resp, err := c.ecAPI.V1API.Stack.GetVersionStacks(
// Add region to get the stack versions for that region only
stack.NewGetVersionStacksParamsWithContext(api.WithRegion(ctx, region)).
WithShowDeleted(&showDeleted).
WithShowUnusable(&showUnusable),
c.ecAPI.AuthWriter,
)
if err != nil {
return nil, fmt.Errorf("cannot retrieve stack versions: %w", err)
}
if resp.Payload == nil || len(resp.Payload.Stacks) == 0 {
return nil, errors.New("stack versions response payload is empty")
}
versionInfos := make(StackVersionInfos, 0, len(resp.Payload.Stacks))
for _, s := range resp.Payload.Stacks {
if preFilter != nil && !preFilter(s) {
continue
}
v, err := NewStackVersionFromStr(s.Version)
if err != nil {
return nil, fmt.Errorf("cannot parse stack version '%v': %w", s.Version, err)
}
if postFilter == nil || postFilter(v) {
upgradableTo, err := sortedStackVersionsForStrs(s.UpgradableTo)
if err != nil {
return nil, fmt.Errorf("cannot parse upgradable to for '%v': %w", s.Version, err)
}
versionInfos = append(versionInfos, StackVersionInfo{
Version: v,
UpgradableTo: upgradableTo,
})
}
}
versionInfos.Sort()
return versionInfos, nil
}
func sortedStackVersionsForStrs(strs []string) ([]StackVersion, error) {
versions := make([]StackVersion, 0, len(strs))
for _, s := range strs {
v, err := NewStackVersionFromStr(s)
if err != nil {
return nil, err
}
versions = append(versions, v)
}
slices.SortFunc(versions, func(a, b StackVersion) int {
return a.Compare(b)
})
return versions, nil
}
// GetVersionInfos retrieves all stack version infos without suffix.
func (c *Client) GetVersionInfos(ctx context.Context, region string) (StackVersionInfos, error) {
postFilter := func(v StackVersion) bool {
// Ignore all with suffix e.g. SNAPSHOTS, BC1
return v.Suffix == ""
}
versions, err := c.getVersionInfos(ctx, region, false, nil, postFilter)
if err != nil {
return nil, fmt.Errorf("get versions failed: %w", err)
}
return versions, nil
}
// GetSnapshotVersionInfos retrieves all stack version infos with the suffix "SNAPSHOT".
func (c *Client) GetSnapshotVersionInfos(ctx context.Context, region string) (StackVersionInfos, error) {
postFilter := func(v StackVersion) bool {
// Only keep SNAPSHOTs
return v.Suffix == "SNAPSHOT"
}
versions, err := c.getVersionInfos(ctx, region, true, nil, postFilter)
if err != nil {
return nil, fmt.Errorf("get snapshot versions failed: %w", err)
}
return versions, nil
}
// GetCandidateVersionInfos retrieves all stack version infos that are potential build / release candidates.
func (c *Client) GetCandidateVersionInfos(ctx context.Context, region string) (StackVersionInfos, error) {
preFilter := func(v *models.StackVersionConfig) bool {
// For BCs and SNAPSHOTs, the `docker_image` will have a suffix e.g.:
// docker.elastic.co/cloud-release/elasticsearch-cloud-ess:8.18.0-928cac41
// As compared to released:
// docker.elastic.co/cloud-release/elasticsearch-cloud-ess:8.17.3
version := strings.Split(*v.Elasticsearch.DockerImage, ":")[1]
splits := strings.Split(version, "-")
// Has suffix and the suffix is not empty / 1 character (older versions have this)
return len(splits) >= 2 && len(splits[1]) > 1
}
postFilter := func(v StackVersion) bool {
// Ignore SNAPSHOTs
return v.Suffix != "SNAPSHOT"
}
versions, err := c.getVersionInfos(ctx, region, false, preFilter, postFilter)
if err != nil {
return nil, fmt.Errorf("get candidate versions failed: %w", err)
}
return versions, nil
}