in banyand/queue/sub/server.go [159:247]
func (s *server) Serve() run.StopNotify {
var opts []grpclib.ServerOption
if s.tls {
opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
}
grpcPanicRecoveryHandler := func(p any) (err error) {
s.log.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic")
return status.Errorf(codes.Internal, "%s", p)
}
streamChain := []grpclib.StreamServerInterceptor{
recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
}
unaryChain := []grpclib.UnaryServerInterceptor{
grpc_validator.UnaryServerInterceptor(),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
}
opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
grpclib.ChainUnaryInterceptor(unaryChain...),
grpclib.ChainStreamInterceptor(streamChain...),
)
s.ser = grpclib.NewServer(opts...)
clusterv1.RegisterServiceServer(s.ser, s)
grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
databasev1.RegisterSnapshotServiceServer(s.ser, s)
streamv1.RegisterStreamServiceServer(s.ser, &streamService{ser: s})
measurev1.RegisterMeasureServiceServer(s.ser, &measureService{ser: s})
var ctx context.Context
ctx, s.clientCloser = context.WithCancel(context.Background())
clientOpts := make([]grpclib.DialOption, 0, 1)
if s.creds == nil {
clientOpts = append(clientOpts, grpclib.WithTransportCredentials(insecure.NewCredentials()))
} else {
clientOpts = append(clientOpts, grpclib.WithTransportCredentials(s.creds))
}
stopCh := make(chan struct{})
client, err := healthcheck.NewClient(ctx, s.log, s.addr, clientOpts)
if err != nil {
s.log.Error().Err(err).Msg("Failed to health check client")
close(stopCh)
return stopCh
}
gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
if err := databasev1.RegisterSnapshotServiceHandlerFromEndpoint(ctx, gwMux, s.addr, clientOpts); err != nil {
s.log.Error().Err(err).Msg("Failed to register snapshot service")
close(stopCh)
return stopCh
}
mux := chi.NewRouter()
mux.Mount("/api", http.StripPrefix("/api", gwMux))
s.httpSrv = &http.Server{
Addr: s.httpAddr,
Handler: mux,
ReadHeaderTimeout: 3 * time.Second,
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
lis, err := net.Listen("tcp", s.addr)
if err != nil {
s.log.Error().Err(err).Msg("Failed to listen")
close(stopCh)
return
}
s.log.Info().Str("addr", s.addr).Msg("Listening to")
err = s.ser.Serve(lis)
if err != nil {
s.log.Error().Err(err).Msg("server is interrupted")
}
wg.Done()
}()
go func() {
s.log.Info().Str("listenAddr", s.httpAddr).Msg("Start healthz http server")
err := s.httpSrv.ListenAndServe()
if err != http.ErrServerClosed {
s.log.Error().Err(err)
}
wg.Done()
}()
go func() {
wg.Wait()
s.log.Info().Msg("All servers are stopped")
close(stopCh)
}()
return stopCh
}