in tools/dds-client/main.go [39:173]
func newRunCmd() *cobra.Command {
args := struct {
ddsServerAddress string
}{
ddsServerAddress: "grpc://localhost:8888",
}
cmd := &cobra.Command{
Use: "run",
Short: "Start dDD client(s)",
Long: `Start dDS client(s)`,
RunE: func(cmd *cobra.Command, _ []string) error {
client, err := stream.New(args.ddsServerAddress)
if err != nil {
return errors.Wrap(err, "failed to connect to xDS server")
}
defer func() {
mdsLog.Info("closing a connection ...")
if err := client.Close(); err != nil {
return
}
}()
ctx := context.Background()
// register mapping and metadata
err = client.MappingRegister(ctx, &mesh_proto.MappingRegisterRequest{
Namespace: "dubbo-system",
ApplicationName: "test",
InterfaceNames: []string{"a1", "a2"},
PodName: os.Getenv("POD_NAME"),
})
if err != nil {
return err
}
err = client.MetadataRegister(ctx, &mesh_proto.MetaDataRegisterRequest{
Namespace: "dubbo-system",
PodName: os.Getenv("POD_NAME"),
Metadata: &mesh_proto.MetaData{
App: "test",
Revision: "11111",
},
})
if err != nil {
return err
}
mdsLog.Info("opening a dds stream ...")
mappingStream, err := client.StartMappingStream()
if err != nil {
return errors.Wrap(err, "failed to start an mDS stream")
}
defer func() {
mdsLog.Info("closing a mds mapping steam ... ")
if err := mappingStream.Close(); err != nil {
return
}
}()
metadataStream, err := client.StartMetadataSteam()
if err != nil {
return errors.Wrap(err, "failed to start an mDS stream")
}
defer func() {
mdsLog.Info("closing a mds metadata stream ... ")
if err := metadataStream.Close(); err != nil {
return
}
}()
// mapping and metadata request
err = mappingStream.MappingSyncRequest(&mesh_proto.MappingSyncRequest{
Namespace: "",
Nonce: "",
InterfaceName: "",
})
if err != nil {
return err
}
err = metadataStream.MetadataSyncRequest(&mesh_proto.MetadataSyncRequest{
Namespace: "",
Nonce: "",
ApplicationName: "",
Revision: "",
})
if err != nil {
return err
}
var eg errgroup.Group
eg.Go(func() error {
for {
mdsLog.Info("waiting for a mapping response ...")
resp, err := mappingStream.WaitForMappingResource()
if err != nil {
return errors.Wrap(err, "failed to receive a mapping response")
}
mdsLog.Info("recv mapping", resp)
if err := mappingStream.MappingACK(); err != nil {
return errors.Wrap(err, "failed to ACK a mapping response")
}
}
})
eg.Go(func() error {
for {
mdsLog.Info("waiting for a metadata response ...")
resp, err := metadataStream.WaitForMetadataResource()
if err != nil {
return errors.Wrap(err, "failed to receive a metadata response")
}
mdsLog.Info("recv metadata", resp)
if err := metadataStream.MetadataACK(); err != nil {
return errors.Wrap(err, "failed to ACK a metadata response")
}
}
})
err = eg.Wait()
if err != nil {
return err
}
return nil
},
}
// flags
cmd.PersistentFlags().StringVar(&args.ddsServerAddress, "dds-server-address", args.ddsServerAddress, "dDS server address")
return cmd
}