golang/rpc_client.go (137 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package golang import ( "context" "errors" "fmt" "sync" "time" v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2" // "google.golang.org/protobuf/types/known/durationpb" ) var ( ErrNoAvailableBrokers = errors.New("rocketmq: no available brokers") ) type RpcClient interface { GracefulStop() error HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error) Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error) idleDuration() time.Duration GetTarget() string } var _ = RpcClient(&rpcClient{}) type rpcClient struct { opts rpcClientOptions mux sync.Mutex conn ClientConn msc v2.MessagingServiceClient target string activityNanoTime time.Time } /* * Memory safety: * - target, opts, msc are never mutated after creation so it can be read concurrently without locking the mutex * - activityNanoTime is read and written to across threads so the mutex must be locked before read or write operations * - conn is autogenerated and designed to be accessed concurrently but closing the connection is not atomic (I think so I haven't been able to confirm it) * so let's assume to access or mutate it locking the mutex is required. */ var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, error) { rc := &rpcClient{ target: target, opts: defaultRpcClientOptions, } for _, opt := range opts { opt.apply(&rc.opts) } conn, err := rc.opts.clientConnFunc(target, rc.opts.connOptions...) if err != nil { return nil, fmt.Errorf("create grpc conn failed, err=%w", err) } rc.conn = conn rc.msc = v2.NewMessagingServiceClient(conn.Conn()) rc.activityNanoTime = time.Now() sugarBaseLogger.Infof("create rpc client success, target=%v", target) return rc, nil } func (rc *rpcClient) GetTarget() string { return rc.target } func (rc *rpcClient) idleDuration() time.Duration { rc.mux.Lock() duration := time.Since(rc.activityNanoTime) rc.mux.Unlock() return duration } func (rc *rpcClient) Close() {} func (rc *rpcClient) GracefulStop() error { rc.mux.Lock() closeResult := rc.conn.Close() sugarBaseLogger.Warnf("close rpc client, target=%s", rc.target) rc.mux.Unlock() return closeResult } func (rc *rpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error) { rc.mux.Lock() rc.activityNanoTime = time.Now() rc.mux.Unlock() resp, err := rc.msc.QueryRoute(ctx, request) sugarBaseLogger.Debugf("queryRoute request: %v, response: %v, err: %v", request, resp, err) return resp, err } func (rc *rpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error) { rc.mux.Lock() rc.activityNanoTime = time.Now() rc.mux.Unlock() resp, err := rc.msc.SendMessage(ctx, request) sugarBaseLogger.Debugf("sendMessage request: %v, response: %v, err: %v", request, resp, err) return resp, err } func (rc *rpcClient) Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error) { return rc.msc.Telemetry(ctx) } func (rc *rpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error) { rc.mux.Lock() rc.activityNanoTime = time.Now() rc.mux.Unlock() resp, err := rc.msc.EndTransaction(ctx, request) sugarBaseLogger.Debugf("endTransaction request: %v, response: %v, err: %v", request, resp, err) return resp, err } func (rc *rpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error) { rc.mux.Lock() rc.activityNanoTime = time.Now() rc.mux.Unlock() resp, err := rc.msc.Heartbeat(ctx, request) sugarBaseLogger.Debugf("heartBeat request: %v, response: %v, err: %v", request, resp, err) return resp, err } func (rc *rpcClient) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error) { rc.mux.Lock() rc.activityNanoTime = time.Now() rc.mux.Unlock() resp, err := rc.msc.NotifyClientTermination(ctx, request) sugarBaseLogger.Debugf("notifyClientTermination request: %v, response: %v, err: %v", request, resp, err) return resp, err } func (rc *rpcClient) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error) { rc.mux.Lock() rc.activityNanoTime = time.Now() rc.mux.Unlock() resp, err := rc.msc.ReceiveMessage(ctx, request) sugarBaseLogger.Debugf("receiveMessage request: %v, err: %v", request, err) return resp, err } func (rc *rpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error) { rc.mux.Lock() rc.activityNanoTime = time.Now() rc.mux.Unlock() resp, err := rc.msc.AckMessage(ctx, request) sugarBaseLogger.Debugf("ackMessage request: %v, response: %v, err: %v", request, resp, err) return resp, err } func (rc *rpcClient) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error) { rc.mux.Lock() rc.activityNanoTime = time.Now() rc.mux.Unlock() resp, err := rc.msc.ChangeInvisibleDuration(ctx, request) sugarBaseLogger.Debugf("changeInvisibleDuration request: %v, response: %v, err: %v", request, resp, err) return resp, err }