func newRunCmd()

in tools/dds-client/main.go [39:173]


func newRunCmd() *cobra.Command {
	args := struct {
		ddsServerAddress string
	}{
		ddsServerAddress: "grpc://localhost:8888",
	}
	cmd := &cobra.Command{
		Use:   "run",
		Short: "Start dDD client(s)",
		Long:  `Start dDS client(s)`,
		RunE: func(cmd *cobra.Command, _ []string) error {
			client, err := stream.New(args.ddsServerAddress)
			if err != nil {
				return errors.Wrap(err, "failed to connect to xDS server")
			}
			defer func() {
				mdsLog.Info("closing a connection ...")
				if err := client.Close(); err != nil {
					return
				}
			}()

			ctx := context.Background()

			// register mapping and metadata
			err = client.MappingRegister(ctx, &mesh_proto.MappingRegisterRequest{
				Namespace:       "dubbo-system",
				ApplicationName: "test",
				InterfaceNames:  []string{"a1", "a2"},
				PodName:         os.Getenv("POD_NAME"),
			})
			if err != nil {
				return err
			}
			err = client.MetadataRegister(ctx, &mesh_proto.MetaDataRegisterRequest{
				Namespace: "dubbo-system",
				PodName:   os.Getenv("POD_NAME"),
				Metadata: &mesh_proto.MetaData{
					App:      "test",
					Revision: "11111",
				},
			})
			if err != nil {
				return err
			}

			mdsLog.Info("opening a dds stream ...")
			mappingStream, err := client.StartMappingStream()
			if err != nil {
				return errors.Wrap(err, "failed to start an mDS stream")
			}
			defer func() {
				mdsLog.Info("closing a mds mapping steam ... ")
				if err := mappingStream.Close(); err != nil {
					return
				}
			}()

			metadataStream, err := client.StartMetadataSteam()
			if err != nil {
				return errors.Wrap(err, "failed to start an mDS stream")
			}
			defer func() {
				mdsLog.Info("closing a mds metadata stream ... ")
				if err := metadataStream.Close(); err != nil {
					return
				}
			}()

			// mapping and metadata request
			err = mappingStream.MappingSyncRequest(&mesh_proto.MappingSyncRequest{
				Namespace:     "",
				Nonce:         "",
				InterfaceName: "",
			})
			if err != nil {
				return err
			}

			err = metadataStream.MetadataSyncRequest(&mesh_proto.MetadataSyncRequest{
				Namespace:       "",
				Nonce:           "",
				ApplicationName: "",
				Revision:        "",
			})
			if err != nil {
				return err
			}

			var eg errgroup.Group

			eg.Go(func() error {
				for {
					mdsLog.Info("waiting for a mapping response ...")
					resp, err := mappingStream.WaitForMappingResource()
					if err != nil {
						return errors.Wrap(err, "failed to receive a mapping response")
					}

					mdsLog.Info("recv mapping", resp)

					if err := mappingStream.MappingACK(); err != nil {
						return errors.Wrap(err, "failed to ACK a mapping response")
					}
				}
			})

			eg.Go(func() error {
				for {
					mdsLog.Info("waiting for a metadata response ...")
					resp, err := metadataStream.WaitForMetadataResource()
					if err != nil {
						return errors.Wrap(err, "failed to receive a metadata response")
					}

					mdsLog.Info("recv metadata", resp)

					if err := metadataStream.MetadataACK(); err != nil {
						return errors.Wrap(err, "failed to ACK a metadata response")
					}
				}
			})

			err = eg.Wait()
			if err != nil {
				return err
			}

			return nil
		},
	}
	// flags
	cmd.PersistentFlags().StringVar(&args.ddsServerAddress, "dds-server-address", args.ddsServerAddress, "dDS server address")
	return cmd
}