in banyand/liaison/grpc/server.go [189:250]
func (s *server) Serve() run.StopNotify {
var opts []grpclib.ServerOption
if s.tls {
opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
}
grpcPanicRecoveryHandler := func(p any) (err error) {
s.log.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic")
return status.Errorf(codes.Internal, "%s", p)
}
unaryMetrics, streamMetrics := observability.MetricsServerInterceptor()
streamChain := []grpclib.StreamServerInterceptor{
grpc_validator.StreamServerInterceptor(),
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
}
if streamMetrics != nil {
streamChain = append(streamChain, streamMetrics)
}
unaryChain := []grpclib.UnaryServerInterceptor{
grpc_validator.UnaryServerInterceptor(),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
}
if unaryMetrics != nil {
unaryChain = append(unaryChain, unaryMetrics)
}
opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
grpclib.ChainUnaryInterceptor(unaryChain...),
grpclib.ChainStreamInterceptor(streamChain...),
)
s.ser = grpclib.NewServer(opts...)
streamv1.RegisterStreamServiceServer(s.ser, s.streamSVC)
measurev1.RegisterMeasureServiceServer(s.ser, s.measureSVC)
// register *Registry
databasev1.RegisterGroupRegistryServiceServer(s.ser, s.groupRegistryServer)
databasev1.RegisterIndexRuleBindingRegistryServiceServer(s.ser, s.indexRuleBindingRegistryServer)
databasev1.RegisterIndexRuleRegistryServiceServer(s.ser, s.indexRuleRegistryServer)
databasev1.RegisterStreamRegistryServiceServer(s.ser, s.streamRegistryServer)
databasev1.RegisterMeasureRegistryServiceServer(s.ser, s.measureRegistryServer)
propertyv1.RegisterPropertyServiceServer(s.ser, s.propertyServer)
databasev1.RegisterTopNAggregationRegistryServiceServer(s.ser, s.topNAggregationRegistryServer)
grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
s.stopCh = make(chan struct{})
go func() {
lis, err := net.Listen("tcp", s.addr)
if err != nil {
s.log.Error().Err(err).Msg("Failed to listen")
close(s.stopCh)
return
}
s.log.Info().Str("addr", s.addr).Msg("Listening to")
err = s.ser.Serve(lis)
if err != nil {
s.log.Error().Err(err).Msg("server is interrupted")
}
close(s.stopCh)
}()
return s.stopCh
}