metadata/client.go (175 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package metadata
import (
"context"
"encoding/json"
"reflect"
)
import (
"github.com/dubbogo/gost/log/logger"
perrors "github.com/pkg/errors"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata/info"
tripleapi "dubbo.apache.org/dubbo-go/v3/metadata/triple_api/proto"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"dubbo.apache.org/dubbo-go/v3/registry"
)
const defaultTimeout = "5s" // s
func GetMetadataFromMetadataReport(revision string, instance registry.ServiceInstance) (*info.MetadataInfo, error) {
report := GetMetadataReport()
if report == nil {
return nil, perrors.New("no metadata report instance found,please check ")
}
return report.GetAppMetadata(instance.GetServiceName(), revision)
}
func GetMetadataFromRpc(revision string, instance registry.ServiceInstance) (*info.MetadataInfo, error) {
url := buildStandardMetadataServiceURL(instance)
url.SetParam(constant.TimeoutKey, defaultTimeout)
p := extension.GetProtocol(url.Protocol)
invoker := p.Refer(url)
if invoker == nil { // can't connect instance
return nil, perrors.New("can not connect to remote metadata service host: " + url.Ip)
}
var remoteService remoteMetadataService
if url.Protocol == constant.TriProtocol && instance.GetMetadata()[constant.MetadataVersion] == constant.MetadataServiceV2Version {
remoteService = &triMetadataServiceV2{invoker: invoker}
} else {
remoteService = &remoteMetadataServiceV1{invoker: invoker}
}
defer func() {
invoker.Destroy()
}()
return remoteService.getMetadataInfo(context.Background(), revision)
}
type remoteMetadataService interface {
getMetadataInfo(context context.Context, revision string) (*info.MetadataInfo, error)
}
type triMetadataServiceV2 struct {
invoker protocol.Invoker
}
func (m *triMetadataServiceV2) getMetadataInfo(ctx context.Context, revision string) (*info.MetadataInfo, error) {
const methodName = "GetMetadataInfo"
req := &tripleapi.MetadataRequest{Revision: revision}
metadataInfo := &tripleapi.MetadataInfoV2{}
inv, _ := generateInvocation(m.invoker.GetURL(), methodName, req, metadataInfo, constant.CallUnary)
res := m.invoker.Invoke(context.Background(), inv)
if res.Error() != nil {
logger.Errorf("could not get the metadata info from remote provider: %v", res.Error())
return nil, res.Error()
}
return convertMetadataInfoV2(metadataInfo), nil
}
func convertMetadataInfoV2(v2 *tripleapi.MetadataInfoV2) *info.MetadataInfo {
infos := make(map[string]*info.ServiceInfo, 0)
for k, v := range v2.Services {
serviceInfo := &info.ServiceInfo{
Name: v.Name,
Group: v.Group,
Version: v.Version,
Protocol: v.Protocol,
Path: v.Path,
Params: v.Params,
}
infos[k] = serviceInfo
}
metadataInfo := &info.MetadataInfo{
App: v2.App,
Revision: v2.Version,
Services: infos,
}
return metadataInfo
}
func generateInvocation(u *common.URL, methodName string, req any, resp any, callType string) (protocol.Invocation, error) {
var inv *invocation.RPCInvocation
if u.Protocol == constant.TriProtocol {
var paramsRawVals []any
paramsRawVals = append(paramsRawVals, req)
if resp != nil {
paramsRawVals = append(paramsRawVals, resp)
}
inv = invocation.NewRPCInvocationWithOptions(
invocation.WithMethodName(methodName),
invocation.WithAttachment(constant.TimeoutKey, "5000"),
invocation.WithAttachment(constant.RetriesKey, "2"),
invocation.WithArguments([]any{req}),
invocation.WithReply(resp),
invocation.WithParameterRawValues(paramsRawVals),
)
inv.SetAttribute(constant.CallTypeKey, callType)
} else {
rV := reflect.ValueOf(req)
inv = invocation.NewRPCInvocationWithOptions(
invocation.WithMethodName(methodName),
invocation.WithArguments([]any{rV.Interface()}),
invocation.WithReply(resp),
invocation.WithAttachments(map[string]any{constant.AsyncKey: "false"}),
invocation.WithParameterValues([]reflect.Value{rV}))
}
return inv, nil
}
type remoteMetadataServiceV1 struct {
invoker protocol.Invoker
}
func (m *remoteMetadataServiceV1) getMetadataInfo(ctx context.Context, revision string) (*info.MetadataInfo, error) {
const methodName = "getMetadataInfo"
metadataInfo := &info.MetadataInfo{}
inv, _ := generateInvocation(m.invoker.GetURL(), methodName, revision, metadataInfo, constant.CallUnary)
res := m.invoker.Invoke(context.Background(), inv)
if res.Error() != nil {
logger.Errorf("could not get the metadata info from remote provider: %v", res.Error())
return nil, res.Error()
}
if metadataInfo.Services == nil {
metadataInfo = res.Result().(*info.MetadataInfo)
}
return metadataInfo, nil
}
// buildStandardMetadataServiceURL will use standard format to build the metadata service url.
func buildStandardMetadataServiceURL(ins registry.ServiceInstance) *common.URL {
ps := getMetadataServiceUrlParams(ins)
if ps[constant.ProtocolKey] == "" {
return nil
}
sn := ins.GetServiceName()
host := ins.GetHost()
metaV := ins.GetMetadata()[constant.MetadataVersion]
proto := ps[constant.ProtocolKey]
convertedParams := make(map[string][]string, len(ps))
for k, v := range ps {
convertedParams[k] = []string{v}
}
u := common.NewURLWithOptions(common.WithIp(host),
common.WithPath(constant.MetadataServiceName),
common.WithProtocol(proto),
common.WithPort(ps[constant.PortKey]),
common.WithParams(convertedParams),
common.WithParamsValue(constant.GroupKey, sn),
common.WithParamsValue(constant.InterfaceKey, constant.MetadataServiceName))
if proto == constant.TriProtocol {
u.SetAttribute(constant.ClientInfoKey, "info")
u.Methods = []string{"GetMetadataInfo", "getMetadataInfo"}
if metaV == constant.MetadataServiceV2Version {
u.Path = constant.MetadataServiceV2Name
u.SetParam(constant.VersionKey, metaV)
u.SetParam(constant.InterfaceKey, constant.MetadataServiceV2Name)
u.DelParam(constant.SerializationKey)
} else {
u.SetParam(constant.SerializationKey, constant.Hessian2Serialization)
}
}
return u
}
// getMetadataServiceUrlParams this will convertV2 the metadata service url parameters to map structure
// it looks like:
// {"dubbo":{"timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"20880"}}
func getMetadataServiceUrlParams(ins registry.ServiceInstance) map[string]string {
ps := ins.GetMetadata()
res := make(map[string]string, 2)
if str, ok := ps[constant.MetadataServiceURLParamsPropertyName]; ok && len(str) > 0 {
err := json.Unmarshal([]byte(str), &res)
if err != nil {
logger.Errorf("could not parse the metadata service url parameters to map", err)
}
}
return res
}