protocol/dubbo3/dubbo3_invoker.go (199 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dubbo3 import ( "context" "reflect" "strconv" "strings" "sync" "time" ) import ( "github.com/dubbogo/gost/log/logger" "github.com/dubbogo/grpc-go/metadata" tripleConstant "github.com/dubbogo/triple/pkg/common/constant" triConfig "github.com/dubbogo/triple/pkg/config" "github.com/dubbogo/triple/pkg/triple" "github.com/dustin/go-humanize" ) import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/protocol" invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation" ) // same as dubbo_invoker.go attachmentKey var attachmentKey = []string{ constant.InterfaceKey, constant.GroupKey, constant.TokenKey, constant.TimeoutKey, constant.VersionKey, tripleConstant.TripleServiceGroup, tripleConstant.TripleServiceVersion, } // DubboInvoker is implement of protocol.Invoker, a dubboInvoker refer to one service and ip. type DubboInvoker struct { protocol.BaseInvoker // the net layer client, it is focus on network communication. client *triple.TripleClient // quitOnce is used to make sure DubboInvoker is only destroyed once quitOnce sync.Once // timeout for service(interface) level. timeout time.Duration // clientGuard is the client lock of dubbo invoker clientGuard *sync.RWMutex } // NewDubboInvoker constructor func NewDubboInvoker(url *common.URL) (*DubboInvoker, error) { rt := config.GetConsumerConfig().RequestTimeout timeout := url.GetParamDuration(constant.TimeoutKey, rt) // for triple pb serialization. The bean name from provider is the provider reference key, // which can't locate the target consumer stub, so we use interface key.. interfaceKey := url.GetParam(constant.InterfaceKey, "") consumerService := config.GetConsumerServiceByInterfaceName(interfaceKey) dubboSerializerType := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization) triCodecType := tripleConstant.CodecType(dubboSerializerType) // new triple client opts := []triConfig.OptionFunction{ triConfig.WithClientTimeout(timeout), triConfig.WithCodecType(triCodecType), triConfig.WithLocation(url.Location), triConfig.WithHeaderAppVersion(url.GetParam(constant.AppVersionKey, "")), triConfig.WithHeaderGroup(url.GetParam(constant.GroupKey, "")), triConfig.WithLogger(logger.GetLogger()), } maxCallRecvMsgSize := constant.DefaultMaxCallRecvMsgSize if maxCall, err := humanize.ParseBytes(url.GetParam(constant.MaxCallRecvMsgSize, "")); err == nil && maxCall != 0 { maxCallRecvMsgSize = int(maxCall) } maxCallSendMsgSize := constant.DefaultMaxCallSendMsgSize if maxCall, err := humanize.ParseBytes(url.GetParam(constant.MaxCallSendMsgSize, "")); err == nil && maxCall != 0 { maxCallSendMsgSize = int(maxCall) } opts = append(opts, triConfig.WithGRPCMaxCallRecvMessageSize(maxCallRecvMsgSize)) opts = append(opts, triConfig.WithGRPCMaxCallSendMessageSize(maxCallSendMsgSize)) tracingKey := url.GetParam(constant.TracingConfigKey, "") if tracingKey != "" { tracingConfig := config.GetTracingConfig(tracingKey) if tracingConfig != nil { if tracingConfig.Name == "jaeger" { if tracingConfig.ServiceName == "" { tracingConfig.ServiceName = config.GetApplicationConfig().Name } opts = append(opts, triConfig.WithJaegerConfig( tracingConfig.Address, tracingConfig.ServiceName, *tracingConfig.UseAgent, )) } else { logger.Warnf("unsupported tracing name %s, now triple only support jaeger", tracingConfig.Name) } } } triOption := triConfig.NewTripleOption(opts...) tlsConfig := config.GetRootConfig().TLSConfig if tlsConfig != nil { triOption.TLSCertFile = tlsConfig.TLSCertFile triOption.TLSKeyFile = tlsConfig.TLSKeyFile triOption.CACertFile = tlsConfig.CACertFile triOption.TLSServerName = tlsConfig.TLSServerName logger.Infof("Triple Client initialized the TLSConfig configuration") } client, err := triple.NewTripleClient(consumerService, triOption) if err != nil { return nil, err } return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, timeout: timeout, clientGuard: &sync.RWMutex{}, }, nil } func (di *DubboInvoker) setClient(client *triple.TripleClient) { di.clientGuard.Lock() defer di.clientGuard.Unlock() di.client = client } func (di *DubboInvoker) getClient() *triple.TripleClient { di.clientGuard.RLock() defer di.clientGuard.RUnlock() return di.client } // Invoke call remoting. func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( result protocol.RPCResult ) if !di.BaseInvoker.IsAvailable() { // Generally, the case will not happen, because the invoker has been removed // from the invoker list before destroy,so no new request will enter the destroyed invoker logger.Warnf("this dubboInvoker is destroyed") result.Err = protocol.ErrDestroyedInvoker return &result } di.clientGuard.RLock() defer di.clientGuard.RUnlock() if di.client == nil { result.Err = protocol.ErrClientClosed return &result } if !di.BaseInvoker.IsAvailable() { // Generally, the case will not happen, because the invoker has been removed // from the invoker list before destroy,so no new request will enter the destroyed invoker logger.Warnf("this grpcInvoker is destroying") result.Err = protocol.ErrDestroyedInvoker return &result } for _, k := range attachmentKey { var paramKey string switch k { case tripleConstant.TripleServiceGroup: paramKey = constant.GroupKey case tripleConstant.TripleServiceVersion: paramKey = constant.VersionKey default: paramKey = k } if v := di.GetURL().GetParam(paramKey, ""); len(v) > 0 { invocation.SetAttachment(k, v) } } // append interface id to ctx gRPCMD := make(metadata.MD, 0) // triple will convert attachment value to []string for k, v := range invocation.Attachments() { if str, ok := v.(string); ok { gRPCMD.Set(k, str) continue } if str, ok := v.([]string); ok { gRPCMD.Set(k, str...) continue } logger.Warnf("[Triple Protocol]Triple attachment value with key = %s is invalid, which should be string or []string", k) } ctx = metadata.NewOutgoingContext(ctx, gRPCMD) ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, di.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, "")) in := make([]reflect.Value, 0, 16) in = append(in, reflect.ValueOf(ctx)) if len(invocation.ParameterValues()) > 0 { in = append(in, invocation.ParameterValues()...) } methodName := invocation.MethodName() triAttachmentWithErr := di.client.Invoke(methodName, in, invocation.Reply()) result.Err = triAttachmentWithErr.GetError() result.Attrs = make(map[string]any) for k, v := range triAttachmentWithErr.GetAttachments() { result.Attrs[k] = v } result.Rest = invocation.Reply() return &result } // get timeout including methodConfig func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) time.Duration { timeout := di.GetURL().GetParam(strings.Join([]string{constant.MethodKeys, invocation.MethodName(), constant.TimeoutKey}, "."), "") if len(timeout) != 0 { if t, err := time.ParseDuration(timeout); err == nil { // config timeout into attachment invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds()))) return t } } // set timeout into invocation at method level invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds()))) return di.timeout } // IsAvailable check if invoker is available, now it is useless func (di *DubboInvoker) IsAvailable() bool { client := di.getClient() if client != nil { // FIXME here can't check if tcp server is started now!!! return client.IsAvailable() } return false } // Destroy destroy dubbo3 client invoker. func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { di.BaseInvoker.Destroy() client := di.getClient() if client != nil { di.setClient(nil) client.Close() } }) }