pkg/core/registry/notify.go (148 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package registry
import (
"context"
"sync"
)
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/events"
)
const (
keySeparator = "-"
)
type AdminInstanceEvent struct {
Action remoting.EventType
Instance registry.ServiceInstance
Key string // store the key for Service.Key()
}
type NotifyListener struct {
manager.ResourceManager
dataplaneCache *sync.Map
eventWriter events.Emitter
ctx *ApplicationContext
}
func NewNotifyListener(
manager manager.ResourceManager,
cache *sync.Map,
writer events.Emitter,
grc *ApplicationContext,
) *NotifyListener {
return &NotifyListener{
manager,
cache,
writer,
grc,
}
}
func (l *NotifyListener) Notify(event *registry.ServiceEvent) {}
func (l *NotifyListener) NotifyAll(events []*registry.ServiceEvent, f func()) {}
func (l *NotifyListener) NotifyInstance(event *AdminInstanceEvent) {
switch event.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
if err := l.createOrUpdateDataplane(context.Background(), event.Key, event.Instance); err != nil {
return
}
case remoting.EventTypeDel:
if err := l.deleteDataplane(context.Background(), event.Key, event.Instance); err != nil {
return
}
}
}
func (l *NotifyListener) NotifyAllInstances(events []*AdminInstanceEvent, f func()) {
for _, event := range events {
l.NotifyInstance(event)
}
}
func (l *NotifyListener) deleteDataplane(ctx context.Context, app string, instance registry.ServiceInstance) error {
address := instance.GetAddress()
var revision string
if v, ok := l.ctx.allInstances.Load(app); ok {
instances := v.([]registry.ServiceInstance)
for _, instance := range instances {
if instance.GetAddress() == address {
revision = instance.GetMetadata()[constant.ExportedServicesRevisionPropertyName]
}
}
}
key := getDataplaneKey(app, revision)
l.dataplaneCache.Delete(key)
if l.eventWriter != nil {
go func() {
l.eventWriter.Send(events.ResourceChangedEvent{
Operation: events.Delete,
Type: mesh.DataplaneType,
Key: core_model.ResourceKey{
Name: key,
},
})
}()
}
return nil
}
func (l *NotifyListener) createOrUpdateDataplane(ctx context.Context, app string, instance registry.ServiceInstance) error {
address := instance.GetAddress()
var revision string
if v, ok := l.ctx.allInstances.Load(app); ok {
instances := v.([]registry.ServiceInstance)
for _, instance := range instances {
if instance.GetAddress() == address {
revision = instance.GetMetadata()[constant.ExportedServicesRevisionPropertyName]
break
}
}
}
key := getDataplaneKey(app, address)
dataplaneResource := mesh.NewDataplaneResource()
dataplaneResource.SetMeta(&resourceMetaObject{
Name: key,
Mesh: core_model.DefaultMesh,
})
dataplaneResource.Spec.Networking = &mesh_proto.Dataplane_Networking{}
dataplaneResource.Spec.Extensions = map[string]string{}
dataplaneResource.Spec.Extensions[mesh_proto.ApplicationName] = app
dataplaneResource.Spec.Extensions[mesh_proto.Revision] = revision
dataplaneResource.Spec.Networking.Address = address
ifaces, err := InboundInterfacesFor(ctx, instance)
if err != nil {
return err
}
ofaces, err := OutboundInterfacesFor(ctx, instance)
if err != nil {
return err
}
dataplaneResource.Spec.Networking.Inbound = ifaces
dataplaneResource.Spec.Networking.Outbound = ofaces
l.dataplaneCache.Store(key, dataplaneResource)
if l.eventWriter != nil {
go func() {
l.eventWriter.Send(events.ResourceChangedEvent{
Operation: events.Update,
Type: mesh.DataplaneType,
Key: core_model.MetaToResourceKey(dataplaneResource.GetMeta()),
})
}()
}
return nil
}
func InboundInterfacesFor(ctx context.Context, instance registry.ServiceInstance) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
var ifaces []*mesh_proto.Dataplane_Networking_Inbound
ifaces = append(ifaces, &mesh_proto.Dataplane_Networking_Inbound{
Port: uint32(instance.GetPort()),
})
return ifaces, nil
}
func OutboundInterfacesFor(ctx context.Context, instance registry.ServiceInstance) ([]*mesh_proto.Dataplane_Networking_Outbound, error) {
var outbounds []*mesh_proto.Dataplane_Networking_Outbound
return outbounds, nil
}
func getDataplaneKey(app string, addr string) string {
return app + keySeparator + addr
}