internal/kibana/agents.go (150 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package kibana
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"time"
"github.com/elastic/elastic-package/internal/logger"
)
var (
waitForPolicyAssignedTimeout = 10 * time.Minute
waitForPolicyAssignedRetryPeriod = 2 * time.Second
)
// Agent represents an Elastic Agent enrolled with fleet.
type Agent struct {
ID string `json:"id"`
PolicyID string `json:"policy_id"`
PolicyRevision int `json:"policy_revision,omitempty"`
LocalMetadata struct {
Host struct {
Name string `json:"name"`
} `json:"host"`
Elastic struct {
Agent struct {
LogLevel string `json:"log_level"`
} `json:"agent"`
} `json:"elastic"`
} `json:"local_metadata"`
Status string `json:"status"`
}
// String method returns string representation of an agent.
func (a *Agent) String() string {
b, err := json.Marshal(a)
if err != nil {
return err.Error()
}
return string(b)
}
// ListAgents returns the list of agents enrolled with Fleet.
func (c *Client) ListAgents(ctx context.Context) ([]Agent, error) {
return c.QueryAgents(ctx, "")
}
// QueryAgents returns the list of agents enrolled with Fleet that satisfy a kibana query.
func (c *Client) QueryAgents(ctx context.Context, kuery string) ([]Agent, error) {
resource := fmt.Sprintf("%s/agents", FleetAPI)
if kuery != "" {
values := make(url.Values)
values.Set("kuery", kuery)
resource += "?" + values.Encode()
}
statusCode, respBody, err := c.get(ctx, resource)
if err != nil {
return nil, fmt.Errorf("could not list agents: %w", err)
}
if statusCode != http.StatusOK {
return nil, fmt.Errorf("could not list agents; API status code = %d; response body = %s", statusCode, respBody)
}
var resp struct {
List []Agent `json:"list"`
Items []Agent `json:"items"`
}
if err := json.Unmarshal(respBody, &resp); err != nil {
return nil, fmt.Errorf("could not convert list agents (response) to JSON: %w", err)
}
switch {
case c.semver != nil && c.semver.Major() < 9:
return resp.List, nil
default:
return resp.Items, nil
}
}
// AssignPolicyToAgent assigns the given Policy to the given Agent.
func (c *Client) AssignPolicyToAgent(ctx context.Context, a Agent, p Policy) error {
reqBody := `{ "policy_id": "` + p.ID + `" }`
path := fmt.Sprintf("%s/agents/%s/reassign", FleetAPI, a.ID)
var statusCode int
var err error
var respBody []byte
switch {
case c.semver != nil && c.semver.Major() < 9:
statusCode, respBody, err = c.put(ctx, path, []byte(reqBody))
default:
statusCode, respBody, err = c.post(ctx, path, []byte(reqBody))
}
if err != nil {
return fmt.Errorf("could not assign policy to agent: %w", err)
}
if statusCode != http.StatusOK {
return fmt.Errorf("could not assign policy to agent; API status code = %d; response body = %s", statusCode, respBody)
}
err = c.waitUntilPolicyAssigned(ctx, a, p)
if err != nil {
return fmt.Errorf("error occurred while waiting for the policy to be assigned to all agents: %w", err)
}
return nil
}
// RemoveAgent unenrolls the given agent
func (c *Client) RemoveAgent(ctx context.Context, a Agent) error {
reqBody := `{ "revoke": true, "force": true }`
path := fmt.Sprintf("%s/agents/%s/unenroll", FleetAPI, a.ID)
statusCode, respBody, err := c.post(ctx, path, []byte(reqBody))
if err != nil {
return fmt.Errorf("could not enroll agent: %w", err)
}
if statusCode != http.StatusOK {
return fmt.Errorf("could not enroll agent; API status code = %d; response body = %s", statusCode, respBody)
}
return nil
}
func (c *Client) waitUntilPolicyAssigned(ctx context.Context, a Agent, p Policy) error {
ctx, cancel := context.WithTimeout(ctx, waitForPolicyAssignedTimeout)
defer cancel()
ticker := time.NewTicker(waitForPolicyAssignedRetryPeriod)
defer ticker.Stop()
for {
agent, err := c.getAgent(ctx, a.ID)
if err != nil {
return fmt.Errorf("can't get the agent: %w", err)
}
logger.Debugf("Agent %s (Host: %s): Policy ID %s LogLevel: %s Status: %s",
agent.ID, agent.LocalMetadata.Host.Name, agent.PolicyID, agent.LocalMetadata.Elastic.Agent.LogLevel, agent.Status)
if agent.PolicyID == p.ID && agent.PolicyRevision >= p.Revision {
logger.Debugf("Policy revision assigned to the agent (ID: %s)...", a.ID)
break
}
logger.Debugf("Wait until the policy (ID: %s, revision: %d) is assigned to the agent (ID: %s)...", p.ID, p.Revision, a.ID)
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return errors.New("timeout: policy hasn't been assigned in time")
}
return ctx.Err()
case <-ticker.C:
continue
}
}
return nil
}
func (c *Client) getAgent(ctx context.Context, agentID string) (*Agent, error) {
statusCode, respBody, err := c.get(ctx, fmt.Sprintf("%s/agents/%s", FleetAPI, agentID))
if err != nil {
return nil, fmt.Errorf("could not list agents: %w", err)
}
if statusCode != http.StatusOK {
return nil, fmt.Errorf("could not list agents; API status code = %d; response body = %s", statusCode, respBody)
}
var resp struct {
Item Agent `json:"item"`
}
if err := json.Unmarshal(respBody, &resp); err != nil {
return nil, fmt.Errorf("could not convert list agents (response) to JSON: %w", err)
}
return &resp.Item, nil
}