protocol/dubbo/dubbo_protocol.go (165 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package dubbo import ( "context" "fmt" "sync" "time" ) import ( "github.com/dubbogo/gost/log/logger" "github.com/opentracing/opentracing-go" ) import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/invocation" "dubbo.apache.org/dubbo-go/v3/remoting" "dubbo.apache.org/dubbo-go/v3/remoting/getty" ) const ( // DUBBO is dubbo protocol name DUBBO = "dubbo" ) var ( // Make the connection can be shared. // It will create one connection for one address (ip+port) exchangeClientMap = new(sync.Map) exchangeLock = new(sync.Map) ) func init() { extension.SetProtocol(DUBBO, GetProtocol) } var dubboProtocol *DubboProtocol // DubboProtocol supports dubbo protocol. It implements Protocol interface for dubbo protocol. type DubboProtocol struct { protocol.BaseProtocol // It is store relationship about serviceKey(group/interface:version) and ExchangeServer // The ExchangeServer is introduced to replace of Server. Because Server is depend on getty directly. serverMap map[string]*remoting.ExchangeServer serverLock sync.Mutex } // NewDubboProtocol create a dubbo protocol. func NewDubboProtocol() *DubboProtocol { return &DubboProtocol{ BaseProtocol: protocol.NewBaseProtocol(), serverMap: make(map[string]*remoting.ExchangeServer), } } // Export export dubbo service. func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { url := invoker.GetURL() serviceKey := url.ServiceKey() exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap()) dp.SetExporterMap(serviceKey, exporter) logger.Infof("[DUBBO Protocol] Export service: %s", url.String()) // start server dp.openServer(url) return exporter } // Refer create dubbo service reference. func (dp *DubboProtocol) Refer(url *common.URL) protocol.Invoker { exchangeClient := getExchangeClient(url) if exchangeClient == nil { logger.Warnf("can't dial the server: %+v", url.Location) return nil } invoker := NewDubboInvoker(url, exchangeClient) dp.SetInvokers(invoker) logger.Infof("[DUBBO Protocol] Refer service: %s", url.String()) return invoker } // Destroy destroy dubbo service. func (dp *DubboProtocol) Destroy() { logger.Infof("DubboProtocol destroy.") dp.BaseProtocol.Destroy() // stop server for key, server := range dp.serverMap { delete(dp.serverMap, key) server.Stop() } } func (dp *DubboProtocol) openServer(url *common.URL) { _, ok := dp.serverMap[url.Location] if !ok { _, ok := dp.ExporterMap().Load(url.ServiceKey()) if !ok { panic("[DubboProtocol]" + url.Key() + "is not existing") } dp.serverLock.Lock() _, ok = dp.serverMap[url.Location] if !ok { handler := func(invocation *invocation.RPCInvocation) protocol.RPCResult { return doHandleRequest(invocation) } srv := remoting.NewExchangeServer(url, getty.NewServer(url, handler)) dp.serverMap[url.Location] = srv srv.Start() } dp.serverLock.Unlock() } } // GetProtocol get a single dubbo protocol. func GetProtocol() protocol.Protocol { if dubboProtocol == nil { dubboProtocol = NewDubboProtocol() } return dubboProtocol } func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult { exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey()) result := protocol.RPCResult{} if exporter == nil { err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey()) logger.Errorf(err.Error()) result.Err = err // reply(session, p, hessian.PackageResponse) return result } invoker := exporter.(protocol.Exporter).GetInvoker() if invoker != nil { // FIXME ctx := rebuildCtx(rpcInvocation) invokeResult := invoker.Invoke(ctx, rpcInvocation) if err := invokeResult.Error(); err != nil { result.Err = invokeResult.Error() // p.Header.ResponseStatus = hessian.Response_OK // p.Body = hessian.NewResponse(nil, err, result.Attachments()) } else { result.Rest = invokeResult.Result() // p.Header.ResponseStatus = hessian.Response_OK // p.Body = hessian.NewResponse(res, nil, result.Attachments()) } result.Attrs = invokeResult.Attachments() } else { result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey()) } return result } func getExchangeClient(url *common.URL) *remoting.ExchangeClient { clientTmp, ok := exchangeClientMap.Load(url.Location) if !ok { var exchangeClientTmp *remoting.ExchangeClient func() { // lock for NewExchangeClient and store into map. _, loaded := exchangeLock.LoadOrStore(url.Location, 0x00) // unlock defer exchangeLock.Delete(url.Location) if loaded { // retry for 5 times. for i := 0; i < 5; i++ { if clientTmp, ok = exchangeClientMap.Load(url.Location); ok { break } else { // if cannot get, sleep a while. time.Sleep(time.Duration(i*100) * time.Millisecond) } } return } // todo set by config exchangeClientTmp = remoting.NewExchangeClient(url, getty.NewClient(getty.Options{ ConnectTimeout: 3 * time.Second, RequestTimeout: 3 * time.Second, }), 3*time.Second, false) // input store if exchangeClientTmp != nil { exchangeClientMap.Store(url.Location, exchangeClientTmp) } }() if exchangeClientTmp != nil { return exchangeClientTmp } } // cannot dial the server if clientTmp == nil { return nil } exchangeClient := clientTmp.(*remoting.ExchangeClient) exchangeClient.IncreaseActiveNumber() return exchangeClient } // rebuildCtx rebuild the context by attachment. // Once we decided to transfer more context's key-value, we should change this. // now we only support rebuild the tracing context func rebuildCtx(inv *invocation.RPCInvocation) context.Context { ctx := context.WithValue(context.Background(), constant.DubboCtxKey("attachment"), inv.Attachments()) // actually, if user do not use any opentracing framework, the err will not be nil. spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(filterContext(inv.Attachments()))) if err == nil { ctx = context.WithValue(ctx, constant.DubboCtxKey(constant.TracingRemoteSpanCtx), spanCtx) } return ctx }