in pkg/client/client.go [345:470]
func (c *client) actionRoundTrip(actionResults chan *proto.ActionResponse) {
actionsCtx, actionsCancel := context.WithCancel(c.ctx)
defer actionsCancel()
actionsClient, err := c.client.Actions(actionsCtx)
if err != nil {
c.impl.OnError(err)
return
}
var actionsWG sync.WaitGroup
done := make(chan bool)
// action requests
actionsWG.Add(1)
go func() {
defer actionsWG.Done()
for {
action, err := actionsClient.Recv()
if err != nil {
if !errors.Is(err, io.EOF) {
c.impl.OnError(err)
}
close(done)
return
}
c.amx.RLock()
actionImpl, ok := c.actions[action.Name]
c.amx.RUnlock()
if !ok {
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.Id,
Status: proto.ActionResponse_FAILED,
Result: ActionErrUndefined,
}
continue
}
var params map[string]interface{}
err = json.Unmarshal(action.Params, ¶ms)
if err != nil {
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.Id,
Status: proto.ActionResponse_FAILED,
Result: ActionErrUnmarshableParams,
}
continue
}
// perform the action
go func() {
res, err := actionImpl.Execute(c.ctx, params)
if err != nil {
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.Id,
Status: proto.ActionResponse_FAILED,
Result: utils.JSONMustMarshal(map[string]string{
"error": err.Error(),
}),
}
return
}
resBytes, err := json.Marshal(res)
if err != nil {
// client-side error, should have been marshal-able
c.impl.OnError(err)
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.Id,
Status: proto.ActionResponse_FAILED,
Result: ActionErrUnmarshableResult,
}
return
}
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.Id,
Status: proto.ActionResponse_SUCCESS,
Result: resBytes,
}
}()
}
}()
// action responses
actionsWG.Add(1)
go func() {
defer actionsWG.Done()
// initial connection of stream must send the token so
// the Elastic Agent knows this clients token.
err := actionsClient.Send(&proto.ActionResponse{
Token: c.token,
Id: ActionResponseInitID,
Status: proto.ActionResponse_SUCCESS,
Result: []byte("{}"),
})
if err != nil {
c.impl.OnError(err)
return
}
for {
select {
case <-done:
return
case res := <-actionResults:
err := actionsClient.Send(res)
if err != nil {
// failed to send, add back to response to try again
actionResults <- res
c.impl.OnError(err)
return
}
}
}
}()
// wait for both send and recv go routines to stop before
// starting a new stream.
actionsWG.Wait()
actionsClient.CloseSend()
}