internal/serverless/client.go (235 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package serverless
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"time"
"github.com/elastic/elastic-package/internal/logger"
)
const (
defaultHostURL = "https://cloud.elastic.co"
projectsAPI = "/api/v1/serverless/projects"
)
type Client struct {
host string
apiKey string
}
// ClientOption is functional option modifying Serverless API client.
type ClientOption func(*Client)
var (
elasticCloudAPIKeyEnv = "EC_API_KEY"
elasticCloudEndpointEnv = "EC_HOST"
ErrProjectNotExist = errors.New("project does not exist")
)
func NewClient(opts ...ClientOption) (*Client, error) {
apiKey := os.Getenv(elasticCloudAPIKeyEnv)
if apiKey == "" {
return nil, fmt.Errorf("unable to obtain value from %s environment variable", elasticCloudAPIKeyEnv)
}
c := &Client{
host: defaultHostURL,
apiKey: apiKey,
}
for _, opt := range opts {
opt(c)
}
host := os.Getenv(elasticCloudEndpointEnv)
if host != "" {
c.host = host
}
logger.Debugf("Using Elastic Cloud URL: %s", c.host)
return c, nil
}
// WithAddress option sets the host to use to connect to Kibana.
func WithAddress(address string) ClientOption {
return func(c *Client) {
c.host = address
}
}
// WithApiKey option sets the host to use to connect to Kibana.
func WithApiKey(apiKey string) ClientOption {
return func(c *Client) {
c.apiKey = apiKey
}
}
func (c *Client) get(ctx context.Context, resourcePath string) (int, []byte, error) {
return c.sendRequest(ctx, http.MethodGet, resourcePath, nil)
}
func (c *Client) post(ctx context.Context, resourcePath string, body []byte) (int, []byte, error) {
return c.sendRequest(ctx, http.MethodPost, resourcePath, body)
}
func (c *Client) delete(ctx context.Context, resourcePath string) (int, []byte, error) {
return c.sendRequest(ctx, http.MethodDelete, resourcePath, nil)
}
func (c *Client) sendRequest(ctx context.Context, method, resourcePath string, body []byte) (int, []byte, error) {
request, err := c.newRequest(ctx, method, resourcePath, bytes.NewReader(body))
if err != nil {
return 0, nil, err
}
return c.doRequest(request)
}
func (c *Client) newRequest(ctx context.Context, method, resourcePath string, reqBody io.Reader) (*http.Request, error) {
base, err := url.Parse(c.host)
if err != nil {
return nil, fmt.Errorf("could not create base URL from host: %v: %w", c.host, err)
}
rel, err := url.Parse(resourcePath)
if err != nil {
return nil, fmt.Errorf("could not create relative URL from resource path: %v: %w", resourcePath, err)
}
u := base.JoinPath(rel.EscapedPath())
u.RawQuery = rel.RawQuery
logger.Debugf("%s %s", method, u)
req, err := http.NewRequestWithContext(ctx, method, u.String(), reqBody)
if err != nil {
return nil, fmt.Errorf("could not create %v request to Kibana API resource: %s: %w", method, resourcePath, err)
}
req.Header.Add("content-type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))
return req, nil
}
func (c *Client) doRequest(request *http.Request) (int, []byte, error) {
client := http.Client{}
resp, err := client.Do(request)
if err != nil {
return 0, nil, fmt.Errorf("could not send request to Kibana API: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, nil, fmt.Errorf("could not read response body: %w", err)
}
return resp.StatusCode, body, nil
}
func (c *Client) CreateProject(ctx context.Context, name, region, projectType string) (*Project, error) {
ReqBody := struct {
Name string `json:"name"`
RegionID string `json:"region_id"`
}{
Name: name,
RegionID: region,
}
p, err := json.Marshal(ReqBody)
if err != nil {
return nil, err
}
resourcePath, err := url.JoinPath(c.host, projectsAPI, projectType)
if err != nil {
return nil, fmt.Errorf("could not build the URL: %w", err)
}
statusCode, respBody, err := c.post(ctx, resourcePath, p)
if err != nil {
return nil, fmt.Errorf("error creating project: %w", err)
}
if statusCode != http.StatusCreated {
return nil, fmt.Errorf("unexpected status code %d, body: %s", statusCode, string(respBody))
}
serverlessProject := &Project{url: c.host, apiKey: c.apiKey}
err = json.Unmarshal(respBody, &serverlessProject)
if err != nil {
return nil, fmt.Errorf("error while decoding create project response: %w", err)
}
return serverlessProject, nil
}
func (c *Client) EnsureProjectInitialized(ctx context.Context, project *Project) error {
timer := time.NewTimer(time.Millisecond)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}
status, err := c.StatusProject(ctx, project)
if err != nil {
logger.Debugf("error querying for status: %s", err.Error())
timer.Reset(time.Second * 5)
continue
}
if status != "initialized" {
logger.Debugf("project not initialized, status: %s", status)
timer.Reset(time.Second * 5)
continue
}
return nil
}
}
func (c *Client) StatusProject(ctx context.Context, project *Project) (string, error) {
resourcePath, err := url.JoinPath(c.host, projectsAPI, project.Type, project.ID, "status")
if err != nil {
return "", fmt.Errorf("could not build the URL: %w", err)
}
statusCode, respBody, err := c.get(ctx, resourcePath)
if err != nil {
return "", fmt.Errorf("error getting status project: %w", err)
}
if statusCode != http.StatusOK {
return "", fmt.Errorf("unexpected status code %d", statusCode)
}
var status struct {
Phase string `json:"phase"`
}
if err := json.Unmarshal(respBody, &status); err != nil {
return "", fmt.Errorf("unable to decode status: %w", err)
}
return status.Phase, nil
}
func (c *Client) DeleteProject(ctx context.Context, project *Project) error {
resourcePath, err := url.JoinPath(c.host, projectsAPI, project.Type, project.ID)
if err != nil {
return fmt.Errorf("could not build the URL: %w", err)
}
statusCode, _, err := c.delete(ctx, resourcePath)
if err != nil {
return fmt.Errorf("error deleting project: %w", err)
}
if statusCode != http.StatusOK {
return fmt.Errorf("unexpected status code %d", statusCode)
}
return nil
}
func (c *Client) GetProject(ctx context.Context, projectType, projectID string) (*Project, error) {
resourcePath, err := url.JoinPath(c.host, projectsAPI, projectType, projectID)
if err != nil {
return nil, fmt.Errorf("could not build the URL: %w", err)
}
statusCode, respBody, err := c.get(ctx, resourcePath)
if err != nil {
return nil, fmt.Errorf("error getting project: %w", err)
}
if statusCode == http.StatusNotFound {
return nil, ErrProjectNotExist
}
if statusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code %d", statusCode)
}
project := &Project{url: c.host, apiKey: c.apiKey}
err = json.Unmarshal(respBody, &project)
if err != nil {
return nil, fmt.Errorf("failed to decode project: %w", err)
}
return project, nil
}
func (c *Client) EnsureEndpoints(ctx context.Context, project *Project) error {
if project.Endpoints.Elasticsearch != "" {
return nil
}
for {
newProject, err := c.GetProject(ctx, project.Type, project.ID)
switch {
case err != nil:
logger.Debugf("request error: %s", err.Error())
case newProject.Endpoints.Elasticsearch != "":
project.Endpoints = newProject.Endpoints
return nil
}
logger.Debugf("Waiting for Elasticsearch endpoint for %s project %q", project.Type, project.ID)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 5):
}
}
}