router/pkg/grpcconnector/grpc_plugin_client.go (118 lines of code) (raw):

package grpcconnector import ( "context" "errors" "sync" "sync/atomic" "time" "github.com/hashicorp/go-plugin" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type GRPCPluginClient struct { isClosed atomic.Bool pc *plugin.Client cc grpc.ClientConnInterface config GRPCPluginClientConfig mu sync.RWMutex } type GRPCPluginClientConfig struct { ReconnectTimeout time.Duration PingInterval time.Duration } var defaultGRPCPluginClientConfig = GRPCPluginClientConfig{ ReconnectTimeout: time.Second * 20, PingInterval: time.Second * 2, } type GRPCPluginClientOption func(*GRPCPluginClientConfig) func WithReconnectConfig(reconnectTimeout time.Duration, pingInterval time.Duration) GRPCPluginClientOption { return func(c *GRPCPluginClientConfig) { c.ReconnectTimeout = reconnectTimeout c.PingInterval = pingInterval } } var _ grpc.ClientConnInterface = &GRPCPluginClient{} func newGRPCPluginClient(pc *plugin.Client, cc grpc.ClientConnInterface, options ...GRPCPluginClientOption) (*GRPCPluginClient, error) { if pc == nil || cc == nil { return nil, errors.New("plugin client or grpc client conn is nil") } config := defaultGRPCPluginClientConfig for _, option := range options { option(&config) } return &GRPCPluginClient{ pc: pc, cc: cc, config: config, }, nil } func (g *GRPCPluginClient) waitForPluginToBeActive() error { timeout := time.After(g.config.ReconnectTimeout) for { select { case <-timeout: return errors.New("plugin was not active in time") default: isActive, err := g.isPluginActive() if err != nil { return err } if isActive { return nil } } } } // isPluginActive checks if the plugin is active by pinging it. // Returns true if the plugin is active, false if it is not, and an error if there is an error. func (g *GRPCPluginClient) isPluginActive() (bool, error) { g.mu.RLock() defer g.mu.RUnlock() if g.pc == nil { return false, nil } clientProtocol, err := g.pc.Client() if err != nil { return false, err } if err := clientProtocol.Ping(); err != nil { time.Sleep(g.config.PingInterval) return false, nil } return true, nil } func (g *GRPCPluginClient) setClients(pluginClient *plugin.Client, clientConn grpc.ClientConnInterface) { // We need to lock here to avoid race conditions // We potentially access the plugin clients during invokes g.mu.Lock() defer g.mu.Unlock() g.pc = pluginClient g.cc = clientConn } // Invoke implements grpc.ClientConnInterface. func (g *GRPCPluginClient) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error { if g.IsPluginProcessExited() { if err := g.waitForPluginToBeActive(); err != nil { return err } } if g.isClosed.Load() { return status.Error(codes.Unavailable, "plugin is not active") } g.mu.RLock() defer g.mu.RUnlock() return g.cc.Invoke(ctx, method, args, reply, opts...) } // NewStream implements grpc.ClientConnInterface. func (g *GRPCPluginClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { return nil, status.Error(codes.Unavailable, "streaming is currently not supported") } // IsPluginProcessExited checks if the plugin process has exited. func (g *GRPCPluginClient) IsPluginProcessExited() bool { g.mu.RLock() defer g.mu.RUnlock() return g.pc == nil || g.pc.Exited() } func (g *GRPCPluginClient) Close() error { if g.pc.Exited() || g.isClosed.Load() { return nil } g.isClosed.Store(true) g.pc.Kill() g.cc = nil return nil }