t/grpc_server_example/main.go (264 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/helloworld.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/import.proto //go:generate protoc --include_imports --descriptor_set_out=proto.pb --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/src.proto //go:generate protoc --descriptor_set_out=echo.pb --include_imports --proto_path=$PWD/proto echo.proto //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/echo.proto // Package main implements a server for Greeter service. package main import ( "context" "crypto/tls" "crypto/x509" "flag" "fmt" "io" "log" "net" "net/http" "os" "os/signal" "strings" "syscall" "time" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" pb "github.com/api7/grpc_server_example/proto" ) var ( grpcAddr = ":10051" grpcsAddr = ":10052" grpcsMtlsAddr string grpcHTTPAddr string crtFilePath = "../t/cert/apisix.crt" keyFilePath = "../t/cert/apisix.key" caFilePath string ) func init() { flag.StringVar(&grpcAddr, "grpc-address", grpcAddr, "address for grpc") flag.StringVar(&grpcsAddr, "grpcs-address", grpcsAddr, "address for grpcs") flag.StringVar(&grpcsMtlsAddr, "grpcs-mtls-address", grpcsMtlsAddr, "address for grpcs in mTLS") flag.StringVar(&grpcHTTPAddr, "grpc-http-address", grpcHTTPAddr, "addresses for http and grpc services at the same time") flag.StringVar(&crtFilePath, "crt", crtFilePath, "path to certificate") flag.StringVar(&keyFilePath, "key", keyFilePath, "path to key") flag.StringVar(&caFilePath, "ca", caFilePath, "path to ca") } // server is used to implement helloworld.GreeterServer. type server struct { // Embed the unimplemented server pb.UnimplementedGreeterServer pb.UnimplementedTestImportServer pb.UnimplementedEchoServer } // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.Name) log.Printf("Enum Gender: %v", in.GetGender()) msg := "Hello " + in.Name person := in.GetPerson() if person != nil { if person.GetName() != "" { msg += fmt.Sprintf(", name: %v", person.GetName()) } if person.GetAge() != 0 { msg += fmt.Sprintf(", age: %v", person.GetAge()) } } return &pb.HelloReply{ Message: msg, Items: in.GetItems(), Gender: in.GetGender(), }, nil } // GetErrResp implements helloworld.GreeterServer func (s *server) GetErrResp(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { st := status.New(codes.Unavailable, "Out of service") st, err := st.WithDetails(&pb.ErrorDetail{ Code: 1, Message: "The server is out of service", Type: "service", }) if err != nil { panic(fmt.Sprintf("Unexpected error attaching metadata: %v", err)) } return nil, st.Err() } func (s *server) SayHelloAfterDelay(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { select { case <-time.After(1 * time.Second): fmt.Println("overslept") case <-ctx.Done(): errStr := ctx.Err().Error() if ctx.Err() == context.DeadlineExceeded { return nil, status.Error(codes.DeadlineExceeded, errStr) } } time.Sleep(1 * time.Second) log.Printf("Received: %v", in.Name) return &pb.HelloReply{Message: "Hello delay " + in.Name}, nil } func (s *server) Plus(ctx context.Context, in *pb.PlusRequest) (*pb.PlusReply, error) { log.Printf("Received: %v %v", in.A, in.B) return &pb.PlusReply{Result: in.A + in.B}, nil } func (s *server) EchoStruct(ctx context.Context, in *pb.StructRequest) (*pb.StructReply, error) { log.Printf("Received: %+v", in) return &pb.StructReply{ Data: in.Data, }, nil } // SayHelloServerStream streams HelloReply back to the client. func (s *server) SayHelloServerStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloServerStreamServer) error { log.Printf("Received server side stream req: %v\n", req) // Say Hello 5 times. for i := 0; i < 5; i++ { if err := stream.Send(&pb.HelloReply{ Message: fmt.Sprintf("Hello %s", req.Name), }); err != nil { return status.Errorf(codes.Unavailable, "Unable to stream request back to client: %v", err) } } return nil } // SayHelloClientStream receives a stream of HelloRequest from a client. func (s *server) SayHelloClientStream(stream pb.Greeter_SayHelloClientStreamServer) error { log.Println("SayHello client side streaming has been initiated.") cache := "" for { req, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.HelloReply{Message: cache}) } if err != nil { return status.Errorf(codes.Unavailable, "Failed to read client stream: %v", err) } cache = fmt.Sprintf("%sHello %s!", cache, req.Name) } } // SayHelloBidirectionalStream establishes a bidirectional stream with the client. func (s *server) SayHelloBidirectionalStream(stream pb.Greeter_SayHelloBidirectionalStreamServer) error { log.Println("SayHello bidirectional streaming has been initiated.") for { req, err := stream.Recv() if err == io.EOF { return stream.Send(&pb.HelloReply{Message: "stream ended"}) } if err != nil { return status.Errorf(codes.Unavailable, "Failed to read client stream: %v", err) } // A small 0.5 sec sleep time.Sleep(500 * time.Millisecond) if err := stream.Send(&pb.HelloReply{Message: fmt.Sprintf("Hello %s", req.Name)}); err != nil { return status.Errorf(codes.Unknown, "Failed to stream response back to client: %v", err) } } } // SayMultipleHello implements helloworld.GreeterServer func (s *server) SayMultipleHello(ctx context.Context, in *pb.MultipleHelloRequest) (*pb.MultipleHelloReply, error) { log.Printf("Received: %v", in.Name) log.Printf("Enum Gender: %v", in.GetGenders()) msg := "Hello " + in.Name persons := in.GetPersons() if persons != nil { for _, person := range persons { if person.GetName() != "" { msg += fmt.Sprintf(", name: %v", person.GetName()) } if person.GetAge() != 0 { msg += fmt.Sprintf(", age: %v", person.GetAge()) } } } return &pb.MultipleHelloReply{ Message: msg, Items: in.GetItems(), Genders: in.GetGenders(), }, nil } func (s *server) Run(ctx context.Context, in *pb.Request) (*pb.Response, error) { return &pb.Response{Body: in.User.Name + " " + in.Body}, nil } func gRPCAndHTTPFunc(grpcServer *grpc.Server) http.Handler { return h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("hello http")) }) if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { grpcServer.ServeHTTP(w, r) } else { mux.ServeHTTP(w, r) } }), &http2.Server{}) } func main() { flag.Parse() go func() { lis, err := net.Listen("tcp", grpcAddr) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() reflection.Register(s) pb.RegisterGreeterServer(s, &server{}) pb.RegisterTestImportServer(s, &server{}) pb.RegisterEchoServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }() go func() { lis, err := net.Listen("tcp", grpcsAddr) if err != nil { log.Fatalf("failed to listen: %v", err) } c, err := credentials.NewServerTLSFromFile(crtFilePath, keyFilePath) if err != nil { log.Fatalf("credentials.NewServerTLSFromFile err: %v", err) } s := grpc.NewServer(grpc.Creds(c)) reflection.Register(s) pb.RegisterGreeterServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }() if grpcHTTPAddr != "" { go func() { lis, err := net.Listen("tcp", grpcHTTPAddr) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() reflection.Register(s) pb.RegisterGreeterServer(s, &server{}) pb.RegisterTestImportServer(s, &server{}) if err := http.Serve(lis, gRPCAndHTTPFunc(s)); err != nil { log.Fatalf("failed to serve grpc: %v", err) } }() } if grpcsMtlsAddr != "" { go func() { lis, err := net.Listen("tcp", grpcsMtlsAddr) if err != nil { log.Fatalf("failed to listen: %v", err) } certificate, err := tls.LoadX509KeyPair(crtFilePath, keyFilePath) if err != nil { log.Fatalf("could not load server key pair: %s", err) } certPool := x509.NewCertPool() ca, err := os.ReadFile(caFilePath) if err != nil { log.Fatalf("could not read ca certificate: %s", err) } if ok := certPool.AppendCertsFromPEM(ca); !ok { log.Fatalf("failed to append client certs") } c := credentials.NewTLS(&tls.Config{ ClientAuth: tls.RequireAndVerifyClientCert, Certificates: []tls.Certificate{certificate}, ClientCAs: certPool, }) s := grpc.NewServer(grpc.Creds(c)) reflection.Register(s) pb.RegisterGreeterServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }() } signals := make(chan os.Signal) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) sig := <-signals log.Printf("get signal %s, exit\n", sig.String()) }