func()

in pkg/mds/server/server.go [190:315]


func (m *MdsServer) MetadataSync(stream mesh_proto.MDSSyncService_MetadataSyncServer) error {
	mesh := core_model.DefaultMesh // todo: mesh
	errChan := make(chan error)

	clientID := uuid.NewString()
	metadataSyncStream := client.NewDubboSyncStream(stream)
	// DubboSyncClient is to handle MetaSyncRequest from data plane
	metadataSyncClient := client.NewDubboSyncClient(
		log.WithName("client"),
		clientID,
		metadataSyncStream,
		&client.Callbacks{
			OnMetadataSyncRequestReceived: func(request *mesh_proto.MetadataSyncRequest) error {
				// when received request, invoke callback
				m.pusher.InvokeCallback(
					core_mesh.MetaDataType,
					clientID,
					request,
					func(rawRequest interface{}, resourceList core_model.ResourceList) core_model.ResourceList {
						req := rawRequest.(*mesh_proto.MetadataSyncRequest)
						metadataList := resourceList.(*core_mesh.MetaDataResourceList)

						// only response the target MetaData Resource by application name or revision
						respMetadataList := &core_mesh.MetaDataResourceList{}
						for _, item := range metadataList.Items {
							// MetaData.Name = AppName.Revision, so we need to check MedaData.Name has prefix of AppName
							if item.Spec != nil && strings.HasPrefix(item.Spec.App, req.ApplicationName) {
								if req.Revision != "" {
									// revision is not empty, response the Metadata with application name and target revision
									if req.Revision == item.Spec.Revision {
										_ = respMetadataList.AddItem(item)
									}
								} else {
									// revision is empty, response the Metadata with target application name
									_ = respMetadataList.AddItem(item)
								}
							}
						}

						return respMetadataList
					},
				)
				return nil
			},
		})

	m.pusher.AddCallback(
		core_mesh.MetaDataType,
		metadataSyncClient.ClientID(),
		func(items pusher.PushedItems) {
			resourceList := items.ResourceList()
			revision := items.Revision()
			metadataList, ok := resourceList.(*core_mesh.MetaDataResourceList)
			if !ok {
				return
			}

			err := metadataSyncClient.Send(metadataList, revision)
			if err != nil {
				if errors.Is(err, io.EOF) {
					log.Info("DubboSyncClient finished gracefully")
					errChan <- nil
					return
				}

				log.Error(err, "send metadata sync response failed", "metadataList", metadataList, "revision", revision)
				errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
			}
		},
		func(resourceList core_model.ResourceList) core_model.ResourceList {
			if resourceList.GetItemType() != core_mesh.MetaDataType {
				return nil
			}

			// only send Metadata which client subscribed
			newResourceList := &core_mesh.MeshResourceList{}
			for _, resource := range resourceList.GetItems() {
				expected := false
				metaData := resource.(*core_mesh.MetaDataResource)
				for _, applicationName := range metadataSyncStream.SubscribedApplicationNames() {
					// MetaData.Name = AppName.Revision, so we need to check MedaData.Name has prefix of AppName
					if strings.HasPrefix(metaData.Spec.GetApp(), applicationName) && mesh == resource.GetMeta().GetMesh() {
						expected = true
						break
					}
				}

				if expected {
					// find
					_ = newResourceList.AddItem(resource)
				}
			}

			return newResourceList
		},
	)

	// in the end, remove callback of this client
	defer m.pusher.RemoveCallback(core_mesh.MetaDataType, metadataSyncClient.ClientID())

	go func() {
		// Handle requests from client
		err := metadataSyncClient.HandleReceive()
		if errors.Is(err, io.EOF) {
			log.Info("DubboSyncClient finished gracefully")
			errChan <- nil
			return
		}

		log.Error(err, "DubboSyncClient finished with an error")
		errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
	}()

	for {
		select {
		case err := <-errChan:
			if err == nil {
				log.Info("MetadataSync finished gracefully")
				return nil
			}

			log.Error(err, "MetadataSync finished with an error")
			return status.Error(codes.Internal, err.Error())
		}
	}
}