metadata/metadata_service.go (306 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package metadata
import (
"context"
"strconv"
"strings"
)
import (
"github.com/dubbogo/gost/log/logger"
perrors "github.com/pkg/errors"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata/info"
tripleapi "dubbo.apache.org/dubbo-go/v3/metadata/triple_api/proto"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/protocolwrapper"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
)
// version will be used by Version func
const (
version = "1.0.0"
allMatch = "*"
)
// MetadataService is used to define meta data related behaviors
// usually the implementation should be singleton
type MetadataService interface {
// GetExportedURLs will get the target exported url in metadata, the url should be unique
GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]*common.URL, error)
// GetExportedServiceURLs will return exported service urls
GetExportedServiceURLs() ([]*common.URL, error)
// GetSubscribedURLs will get the exported urls in metadata
GetSubscribedURLs() ([]*common.URL, error)
Version() (string, error)
// GetMetadataInfo will return metadata info
GetMetadataInfo(revision string) (*info.MetadataInfo, error)
// GetMetadataServiceURL will return the url of metadata service
GetMetadataServiceURL() (*common.URL, error)
}
// DefaultMetadataService is store and query the metadata info in memory when each service registry
type DefaultMetadataService struct {
metadataMap map[string]*info.MetadataInfo
metadataUrl *common.URL
}
func (mts *DefaultMetadataService) setMetadataServiceURL(url *common.URL) {
mts.metadataUrl = url
}
// GetExportedURLs get all exported urls
func (mts *DefaultMetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]*common.URL, error) {
all, err := mts.GetExportedServiceURLs()
if err != nil {
return nil, err
}
urls := make([]*common.URL, 0)
for _, url := range all {
if (url.Interface() == serviceInterface || serviceInterface == allMatch) &&
(url.Group() == group || group == allMatch) &&
(url.Protocol == protocol || protocol == allMatch) &&
(url.Version() == version || version == allMatch) {
urls = append(urls, url)
}
}
return urls, nil
}
// GetMetadataInfo can get metadata in memory
func (mts *DefaultMetadataService) GetMetadataInfo(revision string) (*info.MetadataInfo, error) {
if revision == "" {
return nil, nil
}
for _, metadataInfo := range mts.metadataMap {
if metadataInfo.Revision == revision {
return metadataInfo, nil
}
}
logger.Warnf("metadata not found for revision: %s", revision)
return nil, nil
}
// GetExportedServiceURLs get exported service urls
func (mts *DefaultMetadataService) GetExportedServiceURLs() ([]*common.URL, error) {
urls := make([]*common.URL, 0)
for _, metadataInfo := range mts.metadataMap {
urls = append(urls, metadataInfo.GetExportedServiceURLs()...)
}
return urls, nil
}
// Version will return the version of metadata service
func (mts *DefaultMetadataService) Version() (string, error) {
return version, nil
}
// GetMetadataServiceURL get url of MetadataService
func (mts *DefaultMetadataService) GetMetadataServiceURL() (*common.URL, error) {
return mts.metadataUrl, nil
}
func (mts *DefaultMetadataService) GetSubscribedURLs() ([]*common.URL, error) {
urls := make([]*common.URL, 0)
for _, metadataInfo := range mts.metadataMap {
urls = append(urls, metadataInfo.GetSubscribedURLs()...)
}
return urls, nil
}
// MethodMapper only for rename exported function, for example: rename the function GetMetadataInfo to getMetadataInfo
func (mts *DefaultMetadataService) MethodMapper() map[string]string {
return map[string]string{
"GetExportedURLs": "getExportedURLs",
"GetMetadataInfo": "getMetadataInfo",
}
}
// serviceExporter export MetadataService
type serviceExporter struct {
opts *Options
service MetadataService
protocolExporter protocol.Exporter
v2Exporter protocol.Exporter
}
// Export will export the metadataService
func (e *serviceExporter) Export() error {
var port string
if e.opts.port == 0 {
port = common.GetRandomPort("")
} else {
port = strconv.Itoa(e.opts.port)
}
if e.opts.protocol == constant.DefaultProtocol {
err := e.exportDubbo(port)
if err != nil {
return err
}
} else {
e.exportTripleV1(port)
// v2 only supports triple protocol
e.exportV2(port)
}
return nil
}
func (e *serviceExporter) exportDubbo(port string) error {
version, _ := e.service.Version()
ivkURL := common.NewURLWithOptions(
common.WithPath(constant.MetadataServiceName),
common.WithProtocol(constant.DefaultProtocol),
common.WithPort(port),
common.WithParamsValue(constant.GroupKey, e.opts.appName),
common.WithParamsValue(constant.SerializationKey, constant.Hessian2Serialization),
common.WithParamsValue(constant.ReleaseKey, constant.Version),
common.WithParamsValue(constant.VersionKey, version),
common.WithParamsValue(constant.InterfaceKey, constant.MetadataServiceName),
common.WithParamsValue(constant.BeanNameKey, constant.SimpleMetadataServiceName),
common.WithParamsValue(constant.MetadataTypeKey, e.opts.metadataType),
common.WithParamsValue(constant.SideKey, constant.SideProvider),
)
methods, err := common.ServiceMap.Register(ivkURL.Interface(), ivkURL.Protocol, ivkURL.Group(), ivkURL.Version(), e.service)
if err != nil {
formatErr := perrors.Errorf("The service %v needExport the protocol %v error! Error message is %v.",
ivkURL.Interface(), ivkURL.Protocol, err.Error())
logger.Errorf(formatErr.Error())
return formatErr
}
ivkURL.Methods = strings.Split(methods, ",")
proxyFactory := extension.GetProxyFactory("")
invoker := proxyFactory.GetInvoker(ivkURL)
e.protocolExporter = extension.GetProtocol(ivkURL.Protocol).Export(invoker)
e.service.(*DefaultMetadataService).setMetadataServiceURL(ivkURL)
return nil
}
func (e *serviceExporter) exportTripleV1(port string) {
version, _ := e.service.Version()
svc := &MetadataServiceV1{delegate: e.service}
ivkURL := common.NewURLWithOptions(
common.WithPath(constant.MetadataServiceName),
common.WithProtocol(constant.TriProtocol),
common.WithPort(port),
common.WithParamsValue(constant.GroupKey, e.opts.appName),
common.WithParamsValue(constant.VersionKey, version),
common.WithInterface(constant.MetadataServiceName),
common.WithMethods(strings.Split("getMetadataInfo,GetMetadataInfo", ",")),
common.WithParamsValue(constant.SerializationKey, constant.Hessian2Serialization),
common.WithAttribute(constant.ServiceInfoKey, &MetadataService_ServiceInfo),
common.WithAttribute(constant.RpcServiceKey, svc),
)
proxyFactory := extension.GetProxyFactory("")
invoker := proxyFactory.GetInvoker(ivkURL)
e.protocolExporter = extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
e.service.(*DefaultMetadataService).setMetadataServiceURL(invoker.GetURL())
}
func (e *serviceExporter) exportV2(port string) {
v2 := &MetadataServiceV2{delegate: e.service}
// v2 only supports triple protocol
ivkURL := common.NewURLWithOptions(
common.WithPath(constant.MetadataServiceV2Name),
common.WithProtocol(constant.TriProtocol),
common.WithPort(port),
common.WithParamsValue(constant.GroupKey, e.opts.appName),
common.WithParamsValue(constant.VersionKey, "2.0.0"),
common.WithInterface(constant.MetadataServiceV2Name),
common.WithMethods(strings.Split("getMetadataInfo,GetMetadataInfo", ",")),
common.WithAttribute(constant.ServiceInfoKey, &MetadataServiceV2_ServiceInfo),
common.WithAttribute(constant.RpcServiceKey, v2),
)
proxyFactory := extension.GetProxyFactory("")
invoker := proxyFactory.GetInvoker(ivkURL)
e.v2Exporter = extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
// do not set, because it will override MetadataService
//exporter.metadataService.SetMetadataServiceURL(ivkURL)
}
// serviceInvoker, if base on server.infoInvoker will cause cycle dependency, so we need to use this way
type serviceInvoker struct {
*protocol.BaseInvoker
invoke func(context context.Context, invocation protocol.Invocation) protocol.Result
}
func (si serviceInvoker) Invoke(context context.Context, invocation protocol.Invocation) protocol.Result {
return si.invoke(context, invocation)
}
type MetadataServiceHandler interface {
GetMetadataInfo(ctx context.Context, revision string) (*info.MetadataInfo, error)
}
type MetadataServiceV1 struct {
delegate MetadataService
}
func (mtsV1 *MetadataServiceV1) GetMetadataInfo(ctx context.Context, revision string) (*info.MetadataInfo, error) {
metadataInfo, err := mtsV1.delegate.GetMetadataInfo(revision)
if err != nil {
return nil, err
}
return metadataInfo, nil
}
func convertV1(serviceInfos map[string]*info.ServiceInfo) map[string]*tripleapi.ServiceInfo {
serviceInfoV1s := make(map[string]*tripleapi.ServiceInfo, len(serviceInfos))
for k, i := range serviceInfos {
serviceInfo := &tripleapi.ServiceInfo{
Name: i.Name,
Group: i.Group,
Version: i.Version,
Protocol: i.Protocol,
Port: 0,
Path: i.Path,
Params: i.Params,
}
serviceInfoV1s[k] = serviceInfo
}
return serviceInfoV1s
}
// MetadataServiceV2Handler is an implementation of the org.apache.dubbo.metadata.MetadataServiceV2 service.
type MetadataServiceV2Handler interface {
GetMetadataInfo(context.Context, *tripleapi.MetadataRequest) (*tripleapi.MetadataInfoV2, error)
}
type MetadataServiceV2 struct {
delegate MetadataService
}
func (mtsV2 *MetadataServiceV2) GetMetadataInfo(ctx context.Context, req *tripleapi.MetadataRequest) (*tripleapi.MetadataInfoV2, error) {
metadataInfo, err := mtsV2.delegate.GetMetadataInfo(req.GetRevision())
if err != nil {
return nil, err
}
return &tripleapi.MetadataInfoV2{
App: metadataInfo.App,
Version: metadataInfo.Revision,
Services: convertV2(metadataInfo.Services),
}, err
}
func convertV2(serviceInfos map[string]*info.ServiceInfo) map[string]*tripleapi.ServiceInfoV2 {
serviceInfoV2s := make(map[string]*tripleapi.ServiceInfoV2, len(serviceInfos))
for k, serviceInfo := range serviceInfos {
serviceInfoV2 := &tripleapi.ServiceInfoV2{
Name: serviceInfo.Name,
Group: serviceInfo.Group,
Version: serviceInfo.Version,
Protocol: serviceInfo.Protocol,
Port: 0,
Path: serviceInfo.Path,
Params: serviceInfo.Params,
}
serviceInfoV2s[k] = serviceInfoV2
}
return serviceInfoV2s
}
var MetadataService_ServiceInfo = common.ServiceInfo{
InterfaceName: "org.apache.dubbo.metadata.MetadataService",
ServiceType: (*MetadataServiceHandler)(nil),
Methods: []common.MethodInfo{
{
Name: "getMetadataInfo",
Type: constant.CallUnary,
ReqInitFunc: func() any {
return new(string)
},
MethodFunc: func(ctx context.Context, args []any, handler any) (any, error) {
revision := args[0].(*string)
res, err := handler.(MetadataServiceHandler).GetMetadataInfo(ctx, *revision)
return res, err
},
},
},
}
var MetadataServiceV2_ServiceInfo = common.ServiceInfo{
InterfaceName: "org.apache.dubbo.metadata.MetadataServiceV2",
ServiceType: (*MetadataServiceV2Handler)(nil),
Methods: []common.MethodInfo{
{
Name: "GetMetadataInfo",
Type: constant.CallUnary,
ReqInitFunc: func() any {
return new(tripleapi.MetadataRequest)
},
MethodFunc: func(ctx context.Context, args []any, handler any) (any, error) {
req := args[0].(*tripleapi.MetadataRequest)
res, err := handler.(MetadataServiceV2Handler).GetMetadataInfo(ctx, req)
if err != nil {
return nil, err
}
return triple_protocol.NewResponse(res), nil
},
},
{
Name: "getMetadataInfo",
Type: constant.CallUnary,
ReqInitFunc: func() any {
return new(tripleapi.MetadataRequest)
},
MethodFunc: func(ctx context.Context, args []any, handler any) (any, error) {
req := args[0].(*tripleapi.MetadataRequest)
res, err := handler.(MetadataServiceV2Handler).GetMetadataInfo(ctx, req)
if err != nil {
return nil, err
}
return triple_protocol.NewResponse(res), nil
},
},
},
}