pkg/client/mock/stub_serverV1.go (151 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package mock import ( "encoding/json" "fmt" "net" "sync" "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/gofrs/uuid/v5" "google.golang.org/grpc" ) // StubServerCheckin is used by the StubServer type StubServerCheckin func(*proto.StateObserved) *proto.StateExpected // StubServerAction is used by the StubServer type StubServerAction func(*proto.ActionResponse) error // PerformAction is the stubbed action type for the mocked server type PerformAction struct { Type proto.ActionRequest_Type Name string Params []byte Callback func(map[string]interface{}, error) DiagCallback func([]*proto.ActionDiagnosticUnitResult, error) UnitID string UnitType proto.UnitType Level proto.ActionRequest_Level } type actionResultCh struct { Result map[string]interface{} Diag []*proto.ActionDiagnosticUnitResult Err error } // StubServer is the type that mocks an elastic agent controller server type StubServer struct { proto.UnimplementedElasticAgentServer Port int CheckinImpl StubServerCheckin ActionImpl StubServerAction server *grpc.Server ActionsChan chan *PerformAction SentActions map[string]*PerformAction } // Start the sub V2 server func (s *StubServer) Start(opt ...grpc.ServerOption) error { lis, err := net.Listen("tcp", ":0") if err != nil { return err } s.Port = lis.Addr().(*net.TCPAddr).Port srv := grpc.NewServer(opt...) s.server = srv proto.RegisterElasticAgentServer(s.server, s) go func() { srv.Serve(lis) }() return nil } // Stop the stub V1 server func (s *StubServer) Stop() { if s.server != nil { s.server.Stop() s.server = nil } } // Checkin implementaiton for the stub server func (s *StubServer) Checkin(server proto.ElasticAgent_CheckinServer) error { for { checkin, err := server.Recv() if err != nil { return err } resp := s.CheckinImpl(checkin) if resp == nil { // close connection to client return nil } err = server.Send(resp) if err != nil { return err } } } // Actions implementation for the stub server func (s *StubServer) Actions(server proto.ElasticAgent_ActionsServer) error { var m sync.Mutex done := make(chan bool) go func() { for { select { case <-done: return case act := <-s.ActionsChan: id := uuid.Must(uuid.NewV4()) m.Lock() s.SentActions[id.String()] = act m.Unlock() err := server.Send(&proto.ActionRequest{ Id: id.String(), Name: act.Name, Params: act.Params, }) if err != nil { panic(err) } } } }() defer close(done) for { response, err := server.Recv() if err != nil { return err } err = s.ActionImpl(response) if err != nil { // close connection to client return nil } m.Lock() action, ok := s.SentActions[response.Id] if !ok { // nothing to do, unknown action m.Unlock() continue } delete(s.SentActions, response.Id) m.Unlock() var result map[string]interface{} err = json.Unmarshal(response.Result, &result) if err != nil { return err } if response.Status == proto.ActionResponse_FAILED { error, ok := result["error"] if ok { err = fmt.Errorf("%s", error) } else { err = fmt.Errorf("unknown error") } action.Callback(nil, err) } else { action.Callback(result, nil) } } } // PerformAction implementation for the stub server func (s *StubServer) PerformAction(name string, params map[string]interface{}) (map[string]interface{}, error) { paramBytes, err := json.Marshal(params) if err != nil { return nil, err } resCh := make(chan actionResultCh) s.ActionsChan <- &PerformAction{ Name: name, Params: paramBytes, Callback: func(m map[string]interface{}, err error) { resCh <- actionResultCh{ Result: m, Err: err, } }, } res := <-resCh return res.Result, res.Err }