in pkg/mds/server/server.go [190:315]
func (m *MdsServer) MetadataSync(stream mesh_proto.MDSSyncService_MetadataSyncServer) error {
mesh := core_model.DefaultMesh // todo: mesh
errChan := make(chan error)
clientID := uuid.NewString()
metadataSyncStream := client.NewDubboSyncStream(stream)
// DubboSyncClient is to handle MetaSyncRequest from data plane
metadataSyncClient := client.NewDubboSyncClient(
log.WithName("client"),
clientID,
metadataSyncStream,
&client.Callbacks{
OnMetadataSyncRequestReceived: func(request *mesh_proto.MetadataSyncRequest) error {
// when received request, invoke callback
m.pusher.InvokeCallback(
core_mesh.MetaDataType,
clientID,
request,
func(rawRequest interface{}, resourceList core_model.ResourceList) core_model.ResourceList {
req := rawRequest.(*mesh_proto.MetadataSyncRequest)
metadataList := resourceList.(*core_mesh.MetaDataResourceList)
// only response the target MetaData Resource by application name or revision
respMetadataList := &core_mesh.MetaDataResourceList{}
for _, item := range metadataList.Items {
// MetaData.Name = AppName.Revision, so we need to check MedaData.Name has prefix of AppName
if item.Spec != nil && strings.HasPrefix(item.Spec.App, req.ApplicationName) {
if req.Revision != "" {
// revision is not empty, response the Metadata with application name and target revision
if req.Revision == item.Spec.Revision {
_ = respMetadataList.AddItem(item)
}
} else {
// revision is empty, response the Metadata with target application name
_ = respMetadataList.AddItem(item)
}
}
}
return respMetadataList
},
)
return nil
},
})
m.pusher.AddCallback(
core_mesh.MetaDataType,
metadataSyncClient.ClientID(),
func(items pusher.PushedItems) {
resourceList := items.ResourceList()
revision := items.Revision()
metadataList, ok := resourceList.(*core_mesh.MetaDataResourceList)
if !ok {
return
}
err := metadataSyncClient.Send(metadataList, revision)
if err != nil {
if errors.Is(err, io.EOF) {
log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
log.Error(err, "send metadata sync response failed", "metadataList", metadataList, "revision", revision)
errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
}
},
func(resourceList core_model.ResourceList) core_model.ResourceList {
if resourceList.GetItemType() != core_mesh.MetaDataType {
return nil
}
// only send Metadata which client subscribed
newResourceList := &core_mesh.MeshResourceList{}
for _, resource := range resourceList.GetItems() {
expected := false
metaData := resource.(*core_mesh.MetaDataResource)
for _, applicationName := range metadataSyncStream.SubscribedApplicationNames() {
// MetaData.Name = AppName.Revision, so we need to check MedaData.Name has prefix of AppName
if strings.HasPrefix(metaData.Spec.GetApp(), applicationName) && mesh == resource.GetMeta().GetMesh() {
expected = true
break
}
}
if expected {
// find
_ = newResourceList.AddItem(resource)
}
}
return newResourceList
},
)
// in the end, remove callback of this client
defer m.pusher.RemoveCallback(core_mesh.MetaDataType, metadataSyncClient.ClientID())
go func() {
// Handle requests from client
err := metadataSyncClient.HandleReceive()
if errors.Is(err, io.EOF) {
log.Info("DubboSyncClient finished gracefully")
errChan <- nil
return
}
log.Error(err, "DubboSyncClient finished with an error")
errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
}()
for {
select {
case err := <-errChan:
if err == nil {
log.Info("MetadataSync finished gracefully")
return nil
}
log.Error(err, "MetadataSync finished with an error")
return status.Error(codes.Internal, err.Error())
}
}
}