func()

in pkg/client/client.go [345:470]


func (c *client) actionRoundTrip(actionResults chan *proto.ActionResponse) {
	actionsCtx, actionsCancel := context.WithCancel(c.ctx)
	defer actionsCancel()
	actionsClient, err := c.client.Actions(actionsCtx)
	if err != nil {
		c.impl.OnError(err)
		return
	}

	var actionsWG sync.WaitGroup
	done := make(chan bool)

	// action requests
	actionsWG.Add(1)
	go func() {
		defer actionsWG.Done()
		for {
			action, err := actionsClient.Recv()
			if err != nil {
				if !errors.Is(err, io.EOF) {
					c.impl.OnError(err)
				}
				close(done)
				return
			}

			c.amx.RLock()
			actionImpl, ok := c.actions[action.Name]
			c.amx.RUnlock()
			if !ok {
				actionResults <- &proto.ActionResponse{
					Token:  c.token,
					Id:     action.Id,
					Status: proto.ActionResponse_FAILED,
					Result: ActionErrUndefined,
				}
				continue
			}

			var params map[string]interface{}
			err = json.Unmarshal(action.Params, &params)
			if err != nil {
				actionResults <- &proto.ActionResponse{
					Token:  c.token,
					Id:     action.Id,
					Status: proto.ActionResponse_FAILED,
					Result: ActionErrUnmarshableParams,
				}
				continue
			}

			// perform the action
			go func() {
				res, err := actionImpl.Execute(c.ctx, params)
				if err != nil {
					actionResults <- &proto.ActionResponse{
						Token:  c.token,
						Id:     action.Id,
						Status: proto.ActionResponse_FAILED,
						Result: utils.JSONMustMarshal(map[string]string{
							"error": err.Error(),
						}),
					}
					return
				}
				resBytes, err := json.Marshal(res)
				if err != nil {
					// client-side error, should have been marshal-able
					c.impl.OnError(err)
					actionResults <- &proto.ActionResponse{
						Token:  c.token,
						Id:     action.Id,
						Status: proto.ActionResponse_FAILED,
						Result: ActionErrUnmarshableResult,
					}
					return
				}
				actionResults <- &proto.ActionResponse{
					Token:  c.token,
					Id:     action.Id,
					Status: proto.ActionResponse_SUCCESS,
					Result: resBytes,
				}
			}()
		}
	}()

	// action responses
	actionsWG.Add(1)
	go func() {
		defer actionsWG.Done()

		// initial connection of stream must send the token so
		// the Elastic Agent knows this clients token.
		err := actionsClient.Send(&proto.ActionResponse{
			Token:  c.token,
			Id:     ActionResponseInitID,
			Status: proto.ActionResponse_SUCCESS,
			Result: []byte("{}"),
		})
		if err != nil {
			c.impl.OnError(err)
			return
		}

		for {
			select {
			case <-done:
				return
			case res := <-actionResults:
				err := actionsClient.Send(res)
				if err != nil {
					// failed to send, add back to response to try again
					actionResults <- res
					c.impl.OnError(err)
					return
				}
			}
		}
	}()

	// wait for both send and recv go routines to stop before
	// starting a new stream.
	actionsWG.Wait()
	actionsClient.CloseSend()
}