router/pkg/grpcconnector/grpc_plugin.go (169 lines of code) (raw):
package grpcconnector
import (
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-plugin"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type GRPCPluginConfig struct {
Logger *zap.Logger
PluginPath string
PluginName string
}
type GRPCPlugin struct {
plugin.Plugin
plugin.GRPCPlugin
logger *zap.Logger
done chan struct{}
mu sync.Mutex
disposed atomic.Bool
pluginPath string
pluginName string
client *GRPCPluginClient
}
func NewGRPCPlugin(config GRPCPluginConfig) (*GRPCPlugin, error) {
if config.Logger == nil {
return nil, fmt.Errorf("logger is required")
}
if config.PluginName == "" {
return nil, fmt.Errorf("plugin name is required")
}
if config.PluginPath == "" {
return nil, fmt.Errorf("plugin path is required")
}
return &GRPCPlugin{
done: make(chan struct{}),
mu: sync.Mutex{},
disposed: atomic.Bool{},
logger: config.Logger,
pluginPath: config.PluginPath,
pluginName: config.PluginName,
}, nil
}
// GetClient implements Plugin.
func (p *GRPCPlugin) GetClient() grpc.ClientConnInterface {
if p.client == nil {
return nil
}
return p.client
}
func (p *GRPCPlugin) ensureRunningPluginProcess() {
if p.client.IsPluginProcessExited() {
if err := p.fork(); err != nil {
p.logger.Error("failed to restart plugin", zap.Error(err))
}
}
}
func (p *GRPCPlugin) fork() error {
filePath, err := p.validatePluginPath()
if err != nil {
return fmt.Errorf("failed to validate plugin path: %w", err)
}
handshakeConfig := plugin.HandshakeConfig{
ProtocolVersion: 1,
MagicCookieKey: "GRPC_DATASOURCE_PLUGIN",
MagicCookieValue: "GRPC_DATASOURCE_PLUGIN",
}
pluginCmd := newPluginCommand(filePath)
pluginClient := plugin.NewClient(&plugin.ClientConfig{
Cmd: pluginCmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
HandshakeConfig: handshakeConfig,
Logger: NewPluginLogger(p.logger),
Plugins: map[string]plugin.Plugin{
p.pluginName: p,
},
})
clientProtocol, err := pluginClient.Client()
if err != nil {
return fmt.Errorf("failed to create plugin client protocol: %w", err)
}
rawClient, err := clientProtocol.Dispense(p.pluginName)
if err != nil {
return fmt.Errorf("failed to dispense plugin: %w", err)
}
grpcClient, ok := rawClient.(grpc.ClientConnInterface)
if !ok {
return fmt.Errorf("plugin does not implement grpc.ClientConnInterface")
}
if p.client == nil {
// first time we start the plugin, we need to create a new client
p.client, err = newGRPCPluginClient(pluginClient, grpcClient)
if err != nil {
return fmt.Errorf("failed to create grpc plugin client: %w", err)
}
return nil
}
p.client.setClients(pluginClient, grpcClient)
return nil
}
// Name implements Plugin.
func (p *GRPCPlugin) Name() string {
return p.pluginName
}
// Start implements Plugin.
func (p *GRPCPlugin) Start(ctx context.Context) error {
go func() {
select {
case <-ctx.Done():
err := p.Stop()
if err != nil {
p.logger.Error("failed to stop plugin", zap.Error(err))
}
case <-p.done:
return
}
}()
if err := p.fork(); err != nil {
return fmt.Errorf("failed to start plugin process: %w", err)
}
go func() {
for {
select {
case <-p.done:
return
case <-time.After(time.Second * 2):
p.ensureRunningPluginProcess()
}
}
}()
return nil
}
// Stop implements Plugin.
func (p *GRPCPlugin) Stop() error {
if p.disposed.Load() {
return nil
}
p.mu.Lock()
defer p.mu.Unlock()
var retErr error
if p.client != nil {
if err := p.client.Close(); err != nil {
retErr = errors.Join(retErr, err)
}
}
p.disposed.Store(true)
close(p.done)
return retErr
}
// GRPCClient implements plugin.GRPCPlugin.
func (p *GRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error) {
return conn, nil
}
func (p *GRPCPlugin) validatePluginPath() (string, error) {
filePath := p.pluginPath
info, err := os.Stat(filePath)
if err != nil {
return "", fmt.Errorf("failed to stat plugin: %w", err)
}
if info.IsDir() {
return "", fmt.Errorf("plugin is a directory")
}
if info.Size() == 0 {
return "", fmt.Errorf("plugin is empty")
}
if info.Mode()&0111 == 0 {
return "", fmt.Errorf("plugin is not executable")
}
return filePath, nil
}