pulsar-function-go/pf/instanceControlServicer.go (48 lines of code) (raw):

// // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // package pf import ( "context" "fmt" "net" log "github.com/apache/pulsar/pulsar-function-go/logutil" pb "github.com/apache/pulsar/pulsar-function-go/pb" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" ) type InstanceControlServicer struct { goInstance *goInstance } func (icServicer *InstanceControlServicer) GetFunctionStatus( ctx context.Context, req *empty.Empty) (*pb.FunctionStatus, error) { return icServicer.goInstance.getFunctionStatus(), nil //return nil, status.Errorf(codes.Unimplemented, "method GetFunctionStatus not implemented") } func (icServicer *InstanceControlServicer) GetAndResetMetrics( ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) { return icServicer.goInstance.getAndResetMetrics(), nil } func (icServicer *InstanceControlServicer) ResetMetrics( ctx context.Context, req *empty.Empty) (*empty.Empty, error) { return icServicer.goInstance.resetMetrics(), nil } func (icServicer *InstanceControlServicer) GetMetrics( ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) { return icServicer.goInstance.getMetrics(), nil } func (icServicer *InstanceControlServicer) HealthCheck( ctx context.Context, req *empty.Empty) (*pb.HealthCheckResult, error) { return icServicer.goInstance.healthCheck(), nil } func (icServicer *InstanceControlServicer) serve(goInstance *goInstance) *grpc.Server { // create a listener on TCP port lis, err := net.Listen("tcp", fmt.Sprintf(":%d", goInstance.context.GetPort())) if err != nil { log.Fatalf("failed to listen: %v", err) } // create a gRPC server object grpcServer := grpc.NewServer() // must register before we start the service. pb.RegisterInstanceControlServer(grpcServer, icServicer) // start the server log.Infof("Serving InstanceCommunication on port %d", goInstance.context.GetPort()) go func() { if err := grpcServer.Serve(lis); err != nil { log.Fatalf("Server exited with error: %v", err) } }() return grpcServer }