router/pkg/grpcconnector/grpc_remote.go (65 lines of code) (raw):
package grpcconnector
import (
"context"
"fmt"
"io"
"sync"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// Ensure GRPCStandaloneProvider implements the ClientProvider interface
var _ ClientProvider = (*RemoteGRPCProvider)(nil)
// RemoteGRPCProvider is a client provider that manages a gRPC client connection to a standalone gRPC server.
// It is used to connect to a standalone gRPC server that is not part of the cosmo cluster.
// The provider maintains a single client connection and provides thread-safe access to it.
type RemoteGRPCProvider struct {
logger *zap.Logger
name string
endpoint string
cc grpc.ClientConnInterface
mu sync.RWMutex
}
// RemoteGRPCProviderConfig holds the configuration parameters for creating a new RemoteGRPCProvider.
type RemoteGRPCProviderConfig struct {
// Logger is the zap logger instance to use for logging. If nil, a no-op logger will be used.
Logger *zap.Logger
// Name is the name of the subgraph this provider is connecting to.
Name string
// Endpoint is the URL of the gRPC server to connect to.
Endpoint string
}
// NewRemoteGRPCProvider creates a new RemoteGRPCProvider with the given configuration.
// It validates the configuration parameters and returns an error if any required parameters are missing.
func NewRemoteGRPCProvider(config RemoteGRPCProviderConfig) (*RemoteGRPCProvider, error) {
if config.Logger == nil {
config.Logger = zap.NewNop()
}
if config.Name == "" {
return nil, fmt.Errorf("subgraph name is required")
}
if config.Endpoint == "" {
return nil, fmt.Errorf("endpoint is required")
}
return &RemoteGRPCProvider{
logger: config.Logger,
name: config.Name,
endpoint: config.Endpoint,
}, nil
}
// GetClient returns the gRPC client connection interface.
// This method is thread-safe and can be called concurrently.
func (g *RemoteGRPCProvider) GetClient() grpc.ClientConnInterface {
g.mu.RLock()
defer g.mu.RUnlock()
return g.cc
}
// Name returns the name of the provider.
func (g *RemoteGRPCProvider) Name() string {
return g.name
}
// Start initializes the gRPC client connection if it hasn't been created yet.
// It parses the endpoint URL and creates a new insecure gRPC connection.
func (g *RemoteGRPCProvider) Start(ctx context.Context) error {
if g.cc == nil {
clientConn, err := grpc.NewClient(g.endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("failed to create client connection: %w", err)
}
g.cc = clientConn
}
return nil
}
// Stop closes the gRPC client connection if it implements the io.Closer interface.
// This method is thread-safe.
func (g *RemoteGRPCProvider) Stop() error {
g.mu.Lock()
defer g.mu.Unlock()
if closer, ok := g.cc.(io.Closer); ok {
return closer.Close()
}
return nil
}