func()

in pkg/mds/server/server.go [317:431]


func (m *MdsServer) MappingSync(stream mesh_proto.MDSSyncService_MappingSyncServer) error {
	mesh := core_model.DefaultMesh // todo: mesh
	errChan := make(chan error)

	clientID := uuid.NewString()
	mappingSyncStream := client.NewDubboSyncStream(stream)
	// DubboSyncClient is to handle MappingSyncRequest from data plane
	mappingSyncClient := client.NewDubboSyncClient(
		log.WithName("client"),
		clientID,
		mappingSyncStream,
		&client.Callbacks{
			OnMappingSyncRequestReceived: func(request *mesh_proto.MappingSyncRequest) error {
				// when received request, invoke callback
				m.pusher.InvokeCallback(
					core_mesh.MappingType,
					clientID,
					request,
					func(rawRequest interface{}, resourceList core_model.ResourceList) core_model.ResourceList {
						req := rawRequest.(*mesh_proto.MappingSyncRequest)
						mappingList := resourceList.(*core_mesh.MappingResourceList)

						// only response the target Mapping Resource by interface name
						respMappingList := &core_mesh.MappingResourceList{}
						for _, item := range mappingList.Items {
							if item.Spec != nil && req.InterfaceName == item.Spec.InterfaceName {
								_ = respMappingList.AddItem(item)
							}
						}

						return respMappingList
					},
				)
				return nil
			},
		})

	m.pusher.AddCallback(
		core_mesh.MappingType,
		mappingSyncClient.ClientID(),
		func(items pusher.PushedItems) {
			resourceList := items.ResourceList()
			revision := items.Revision()
			mappingList, ok := resourceList.(*core_mesh.MappingResourceList)
			if !ok {
				return
			}

			err := mappingSyncClient.Send(mappingList, revision)
			if err != nil {
				if errors.Is(err, io.EOF) {
					log.Info("DubboSyncClient finished gracefully")
					errChan <- nil
					return
				}

				log.Error(err, "send mapping sync response failed", "mappingList", mappingList, "revision", revision)
				errChan <- errors.Wrap(err, "DubboSyncClient send with an error")
			}
		},
		func(resourceList core_model.ResourceList) core_model.ResourceList {
			if resourceList.GetItemType() != core_mesh.MappingType {
				return nil
			}

			// only send Mapping which client subscribed
			newResourceList := &core_mesh.MeshResourceList{}
			for _, resource := range resourceList.GetItems() {
				expected := false
				for _, interfaceName := range mappingSyncStream.SubscribedInterfaceNames() {
					if interfaceName == resource.GetMeta().GetName() && mesh == resource.GetMeta().GetMesh() {
						expected = true
						break
					}
				}

				if expected {
					// find
					_ = newResourceList.AddItem(resource)
				}
			}

			return newResourceList
		},
	)

	// in the end, remove callback of this client
	defer m.pusher.RemoveCallback(core_mesh.MappingType, mappingSyncClient.ClientID())

	go func() {
		// Handle requests from client
		err := mappingSyncClient.HandleReceive()
		if errors.Is(err, io.EOF) {
			log.Info("DubboSyncClient finished gracefully")
			errChan <- nil
			return
		}

		log.Error(err, "DubboSyncClient finished with an error")
		errChan <- errors.Wrap(err, "DubboSyncClient finished with an error")
	}()

	for {
		select {
		case err := <-errChan:
			if err == nil {
				log.Info("MappingSync finished gracefully")
				return nil
			}

			log.Error(err, "MappingSync finished with an error")
			return status.Error(codes.Internal, err.Error())
		}
	}
}