in pkg/mds/server/server.go [317:431]
func (m *MdsServer) MappingSync(stream mesh_proto.MDSSyncService_MappingSyncServer) error {
mesh := core_model.DefaultMesh // todo: mesh
errChan := make(chan error)
clientID := uuid.NewString()
mappingSyncStream := client.NewDubboSyncStream(stream)
// DubboSyncClient is to handle MappingSyncRequest from data plane
mappingSyncClient := client.NewDubboSyncClient(
log.WithName("client"),
clientID,
mappingSyncStream,
&client.Callbacks{
OnMappingSyncRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
// when received request, invoke callback
m.pusher.InvokeCallback(
core_mesh.MappingType,
clientID,
request,
func(rawRequest interface{}, resourceList core_model.ResourceList) core_model.ResourceList {
req := rawRequest.(*mesh_proto.MappingSyncRequest)
mappingList := resourceList.(*core_mesh.MappingResourceList)
// only response the target Mapping Resource by interface name
respMappingList := &core_mesh.MappingResourceList{}
for _, item := range mappingList.Items {
if item.Spec != nil && req.InterfaceName == item.Spec.InterfaceName {
_ = respMappingList.AddItem(item)
}
}
return respMappingList
},
)
return nil
},
})
m.pusher.AddCallback(
core_mesh.MappingType,
mappingSyncClient.ClientID(),
func(items pusher.PushedItems) {
resourceList := items.ResourceList()
revision := items.Revision()
mappingList, ok := resourceList.(*core_mesh.MappingResourceList)
if !ok {
return
}
err := mappingSyncClient.Send(mappingList, revision)
if err != nil {
if errors.Is(err, io.EOF) {
log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
log.Error(err, "send mapping sync response failed", "mappingList", mappingList, "revision", revision)
errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
}
},
func(resourceList core_model.ResourceList) core_model.ResourceList {
if resourceList.GetItemType() != core_mesh.MappingType {
return nil
}
// only send Mapping which client subscribed
newResourceList := &core_mesh.MeshResourceList{}
for _, resource := range resourceList.GetItems() {
expected := false
for _, interfaceName := range mappingSyncStream.SubscribedInterfaceNames() {
if interfaceName == resource.GetMeta().GetName() && mesh == resource.GetMeta().GetMesh() {
expected = true
break
}
}
if expected {
// find
_ = newResourceList.AddItem(resource)
}
}
return newResourceList
},
)
// in the end, remove callback of this client
defer m.pusher.RemoveCallback(core_mesh.MappingType, mappingSyncClient.ClientID())
go func() {
// Handle requests from client
err := mappingSyncClient.HandleReceive()
if errors.Is(err, io.EOF) {
log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
log.Error(err, "DubboSyncClient finished with an error")
errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
}()
for {
select {
case err := <-errChan:
if err == nil {
log.Info("MappingSync finished gracefully")
return nil
}
log.Error(err, "MappingSync finished with an error")
return status.Error(codes.Internal, err.Error())
}
}
}