func()

in banyand/queue/sub/server.go [159:247]


func (s *server) Serve() run.StopNotify {
	var opts []grpclib.ServerOption
	if s.tls {
		opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
	}
	grpcPanicRecoveryHandler := func(p any) (err error) {
		s.log.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic")

		return status.Errorf(codes.Internal, "%s", p)
	}

	streamChain := []grpclib.StreamServerInterceptor{
		recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
	}
	unaryChain := []grpclib.UnaryServerInterceptor{
		grpc_validator.UnaryServerInterceptor(),
		recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
	}

	opts = append(opts, grpclib.MaxRecvMsgSize(int(s.maxRecvMsgSize)),
		grpclib.ChainUnaryInterceptor(unaryChain...),
		grpclib.ChainStreamInterceptor(streamChain...),
	)
	s.ser = grpclib.NewServer(opts...)
	clusterv1.RegisterServiceServer(s.ser, s)
	grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
	databasev1.RegisterSnapshotServiceServer(s.ser, s)
	streamv1.RegisterStreamServiceServer(s.ser, &streamService{ser: s})
	measurev1.RegisterMeasureServiceServer(s.ser, &measureService{ser: s})

	var ctx context.Context
	ctx, s.clientCloser = context.WithCancel(context.Background())
	clientOpts := make([]grpclib.DialOption, 0, 1)
	if s.creds == nil {
		clientOpts = append(clientOpts, grpclib.WithTransportCredentials(insecure.NewCredentials()))
	} else {
		clientOpts = append(clientOpts, grpclib.WithTransportCredentials(s.creds))
	}
	stopCh := make(chan struct{})
	client, err := healthcheck.NewClient(ctx, s.log, s.addr, clientOpts)
	if err != nil {
		s.log.Error().Err(err).Msg("Failed to health check client")
		close(stopCh)
		return stopCh
	}
	gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
	if err := databasev1.RegisterSnapshotServiceHandlerFromEndpoint(ctx, gwMux, s.addr, clientOpts); err != nil {
		s.log.Error().Err(err).Msg("Failed to register snapshot service")
		close(stopCh)
		return stopCh
	}
	mux := chi.NewRouter()
	mux.Mount("/api", http.StripPrefix("/api", gwMux))
	s.httpSrv = &http.Server{
		Addr:              s.httpAddr,
		Handler:           mux,
		ReadHeaderTimeout: 3 * time.Second,
	}
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		lis, err := net.Listen("tcp", s.addr)
		if err != nil {
			s.log.Error().Err(err).Msg("Failed to listen")
			close(stopCh)
			return
		}
		s.log.Info().Str("addr", s.addr).Msg("Listening to")
		err = s.ser.Serve(lis)
		if err != nil {
			s.log.Error().Err(err).Msg("server is interrupted")
		}
		wg.Done()
	}()
	go func() {
		s.log.Info().Str("listenAddr", s.httpAddr).Msg("Start healthz http server")
		err := s.httpSrv.ListenAndServe()
		if err != http.ErrServerClosed {
			s.log.Error().Err(err)
		}
		wg.Done()
	}()
	go func() {
		wg.Wait()
		s.log.Info().Msg("All servers are stopped")
		close(stopCh)
	}()
	return stopCh
}