dev-tools/v2tool/manager/manager.go (194 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package manager
import (
"fmt"
"os"
"os/exec"
"time"
"gopkg.in/yaml.v2"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/pkg/core/process"
)
// InputManager carries all the logic needed run a V2 client,
// including process handles, and the logic for sending/updating units
type InputManager struct {
logger *logp.Logger
client *process.Info
Units []Unit
clientArgs []string
}
// Unit represents a single V2 config unit as tracked across the lifetime of a test run with the tool.
type Unit struct {
Rules RulesCfg
done bool
State *proto.UnitExpected
}
// InputManagerFromCfg creates the input Manager from a path to a yaml config
func InputManagerFromCfg(cfgPath string) (*InputManager, error) {
fileData, err := os.ReadFile(cfgPath)
if err != nil {
return nil, fmt.Errorf("error reading config from file %s: %w", cfgPath, err)
}
inputMgrCfg := Config{}
err = yaml.Unmarshal(fileData, &inputMgrCfg)
if err != nil {
return nil, fmt.Errorf("error unmarshalling yaml for file %s: %w", cfgPath, err)
}
// restructure the config into the InputManager
units := []Unit{}
for _, input := range inputMgrCfg.Inputs.Inputs {
ruleCfg, ok := inputMgrCfg.Rules[input.Id]
if !ok {
keys := make([]string, 0, len(inputMgrCfg.Rules))
for k := range inputMgrCfg.Rules {
keys = append(keys, k)
}
return nil, fmt.Errorf("could not find rules for input with ID '%s', available rules are %v", input.Id, keys)
}
unit, err := newUnit(input, ruleCfg)
if err != nil {
return nil, fmt.Errorf("Error adding unit of ID %s: %w", input.Id, err)
}
units = append(units, unit)
}
return &InputManager{logger: logp.L().Named("input-manager"), Units: units, clientArgs: inputMgrCfg.Args}, nil
}
// StartInputProcess starts the V2 client
func (in *InputManager) StartInputProcess(path string) error {
in.logger.Debugf("Client args are: %v", in.clientArgs)
proc, err := process.Start(path, process.WithArgs(in.clientArgs), process.WithCmdOptions(attachOutErr))
if err != nil {
return fmt.Errorf("error starting process from path %s: %w", path, err)
}
in.client = proc
return nil
}
// WriteToClient writes a byte string (usually the V2 conn info)
func (in *InputManager) WriteToClient(info []byte) error {
_, err := in.client.Stdin.Write(info)
if err != nil {
return fmt.Errorf("failed to write connection information: %w", err)
}
// if you remove this the V2 client won't get an EOF, be careful
in.client.Stdin.Close()
return nil
}
// WaitForClientClose blocks until the client process ends
func (in *InputManager) WaitForClientClose() {
if in.client == nil {
return
}
waitChan := in.client.Wait()
for {
select {
case end := <-waitChan:
in.logger.Debugf("Got client stop for pid %d, status %d", end.Pid(), end.ExitCode())
return
}
}
}
func attachOutErr(cmd *exec.Cmd) error {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return nil
}
func newUnit(expected *proto.UnitExpectedConfig, rules RulesCfg) (Unit, error) {
expectedStart := generateUnitExpected(expected)
return Unit{State: expectedStart, Rules: rules}, nil
}
// Checkin is called every time the Mock V2 server performs a client checkin.
// It updates the units based on the current state, and returns a new expected config
func (in *InputManager) Checkin(observed *proto.CheckinObserved, started time.Time) *proto.CheckinExpected {
base := &proto.CheckinExpected{
AgentInfo: &proto.AgentInfo{
Id: "test-agent",
Version: "8.4.0",
Snapshot: true,
},
}
// Client is starting up, no units are configured yet
if len(observed.Units) == 0 {
in.logger.Debugf("No units found, sending starting configuration")
base.Units = in.startupConfig(started)
} else {
// we got some kind of units, update instead
base.Units = in.configFromObserved(started, observed)
}
return base
}
// configFromObserved handles the logic of updating expected units based on our current state
func (in *InputManager) configFromObserved(started time.Time, obs *proto.CheckinObserved) []*proto.UnitExpected {
// any units not in this array will be removed by the server
units := []*proto.UnitExpected{}
for iter, v := range in.Units {
obsUnit, foundObs := findObserved(v.State.Id, obs.Units)
// Case, the unit has already been asked to stop
if v.done {
if foundObs {
if obsUnit.State == proto.State_STOPPED { // case: The unit has been marked as done, and now stopped
in.logger.Debugf("Unit %s marked as done by v2tool, found in state %; removing", v.State.Id, obsUnit.State.String())
continue
}
} else { // case: The unit is done, and has been completely removed
continue
}
}
if foundObs && obsUnit.State == proto.State_STOPPED {
in.logger.Warnf("Unit %s was not explicitly stopped, but is now stopped; removing")
continue
}
// unit has previously been sent to client
if foundObs {
if obsUnit.State == proto.State_FAILED {
in.logger.Debugf("Unit %s has been marked as failed; removing", v.State.Id)
continue
}
// Do we no want to stop?
// The check functions are responsible for checking the actual status of the state
if !v.done && v.Rules.Stop.Check.Check(started, obsUnit) {
in.logger.Debugf("Unit %s will be marked as STOPPED", v.State.Id)
in.Units[iter].State.State = proto.State_STOPPED
in.Units[iter].State.ConfigStateIdx++
units = append(units, in.Units[iter].State)
in.Units[iter].done = true
} else {
// unit will continue as normal
units = append(units, in.Units[iter].State)
}
} else {
// unit doesn't exist, do we want to start?
if v.Rules.Start.Check.Check(started, nil) {
in.logger.Debugf("Unit %s will be marked as STARTING")
units = append(units, in.Units[iter].State)
}
}
}
return units
}
// returns the unit config for a V2 client's first startup
func (in *InputManager) startupConfig(started time.Time) []*proto.UnitExpected {
units := []*proto.UnitExpected{}
for iter, v := range in.Units {
if !v.Rules.Start.Check.Check(started, nil) {
continue
}
in.logger.Debugf("Unit with ID %s will run at startup", in.Units[iter].State.Id)
// assume the starting input config is fine
units = append(units, in.Units[iter].State)
}
return units
}
// helper to return an observed unit state based on an ID
func findObserved(id string, units []*proto.UnitObserved) (*proto.UnitObserved, bool) {
for _, unit := range units {
if unit.Id == id {
return unit, true
}
}
return nil, false
}
// helper to turn a unit config into an entire UnitExpected struct used by the MockV2 server
func generateUnitExpected(cfg *proto.UnitExpectedConfig) *proto.UnitExpected {
return &proto.UnitExpected{
Id: cfg.Id,
Type: proto.UnitType_INPUT,
ConfigStateIdx: 0,
Config: cfg,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_DEBUG,
}
}
// helper to form an entire CheckinExpected structure
func createUnitsWithState(state proto.State, input *proto.UnitExpectedConfig, inID string, stateIndex uint64) *proto.CheckinExpected {
return &proto.CheckinExpected{
AgentInfo: &proto.AgentInfo{
Id: "test-agent",
Version: "8.4.0",
Snapshot: true,
},
Units: []*proto.UnitExpected{
{
Id: inID,
Type: proto.UnitType_INPUT,
ConfigStateIdx: stateIndex,
Config: input,
State: state,
LogLevel: proto.UnitLogLevel_DEBUG,
},
},
}
}