func newRunCmd()

in tools/xds-client/main.go [45:185]


func newRunCmd() *cobra.Command {
	log := core.Log.WithName("dubbo-xds-client").WithName("run")
	args := struct {
		xdsServerAddress string
		dps              int
		services         int
		inbounds         int
		outbounds        int
		rampUpPeriod     time.Duration
	}{
		xdsServerAddress: "grpc://localhost:5678",
		dps:              100,
		services:         50,
		inbounds:         1,
		outbounds:        3,
		rampUpPeriod:     30 * time.Second,
	}
	cmd := &cobra.Command{
		Use:   "run",
		Short: "Start xDS client(s) that simulate Envoy",
		Long:  `Start xDS client(s) that simulate Envoy.`,
		RunE: func(cmd *cobra.Command, _ []string) error {
			ipRand := rand.Uint32() // #nosec G404 -- that's just a test tool
			log.Info("going to start xDS clients (Envoy simulators)", "dps", args.dps)
			errCh := make(chan error, 1)
			for i := 0; i < args.dps; i++ {
				id := fmt.Sprintf("default.dataplane-%d", i)
				nodeLog := log.WithName("envoy-simulator").WithValues("idx", i, "ID", id)
				nodeLog.Info("creating an xDS client ...")

				go func(i int) {
					buf := make([]byte, 4)
					binary.LittleEndian.PutUint32(buf, ipRand+uint32(i))
					ip := net.IP(buf).String()

					dpSpec := &v1alpha1.Dataplane{
						Networking: &v1alpha1.Dataplane_Networking{
							Address: ip,
						},
					}
					for j := 0; j < args.inbounds; j++ {
						service := fmt.Sprintf("service-%d", rand.Int()%args.services) // #nosec G404 -- that's just a test tool
						dpSpec.Networking.Inbound = append(dpSpec.Networking.Inbound, &v1alpha1.Dataplane_Networking_Inbound{
							Port: uint32(8080 + j),
							Tags: map[string]string{
								v1alpha1.ServiceTag:  service,
								v1alpha1.ProtocolTag: "http",
							},
						})
					}
					for j := 0; j < args.outbounds; j++ {
						service := fmt.Sprintf("service-%d", rand.Int()%args.services) // #nosec G404 -- that's just a test tool
						dpSpec.Networking.Outbound = append(dpSpec.Networking.Outbound, &v1alpha1.Dataplane_Networking_Outbound{
							Port: uint32(10080 + j), Tags: map[string]string{v1alpha1.ServiceTag: service},
						})
					}

					dp := &unversioned.Resource{
						Meta: rest_v1alpha1.ResourceMeta{Mesh: "default", Name: fmt.Sprintf("dataplane-%d.dubbo-system", i), Type: "Dataplane"},
						Spec: dpSpec,
					}

					// add some jitter
					delay := time.Duration(int64(float64(args.rampUpPeriod.Nanoseconds()) * rand.Float64())) // #nosec G404 -- that's just a test tool
					// wait
					<-time.After(delay)
					// proceed

					errCh <- func() (errs error) {
						client, err := stream.New(args.xdsServerAddress)
						if err != nil {
							return errors.Wrap(err, "failed to connect to xDS server")
						}
						defer func() {
							nodeLog.Info("closing a connection ...")
							if err := client.Close(); err != nil {
								errs = multierr.Append(errs, errors.Wrapf(err, "failed to close a connection"))
							}
						}()

						nodeLog.Info("opening an xDS stream ...")
						stream, err := client.StartStream()
						if err != nil {
							return errors.Wrap(err, "failed to start an xDS stream")
						}
						defer func() {
							nodeLog.Info("closing an xDS stream ...")
							if err := stream.Close(); err != nil {
								errs = multierr.Append(errs, errors.Wrapf(err, "failed to close an xDS stream"))
							}
						}()

						nodeLog.Info("requesting Listeners")
						e := stream.Request(id, envoy_resource.ListenerType, dp)
						if e != nil {
							return errors.Wrapf(e, "failed to request %q", envoy_resource.ListenerType)
						}

						nodeLog.Info("requesting Clusters")
						e = stream.Request(id, envoy_resource.ClusterType, dp)
						if e != nil {
							return errors.Wrapf(e, "failed to request %q", envoy_resource.ClusterType)
						}

						nodeLog.Info("requesting Endpoints")
						e = stream.Request(id, envoy_resource.EndpointType, dp)
						if e != nil {
							return errors.Wrapf(e, "failed to request %q", envoy_resource.EndpointType)
						}

						for {
							nodeLog.Info("waiting for a discovery response ...")
							resp, err := stream.WaitForResources()
							if err != nil {
								return errors.Wrap(err, "failed to receive a discovery response")
							}
							nodeLog.Info("received xDS resources", "type", resp.TypeUrl, "version", resp.VersionInfo, "nonce", resp.Nonce, "resources", len(resp.Resources))

							if err := stream.ACK(resp.TypeUrl); err != nil {
								return errors.Wrap(err, "failed to ACK a discovery response")
							}
							nodeLog.Info("ACKed discovery response", "type", resp.TypeUrl, "version", resp.VersionInfo, "nonce", resp.Nonce)
						}
					}()
				}(i)
			}

			err := <-errCh

			return errors.Wrap(err, "one of xDS clients (Envoy simulators) terminated with an error")
		},
	}
	// flags
	cmd.PersistentFlags().StringVar(&args.xdsServerAddress, "xds-server-address", args.xdsServerAddress, "address of xDS server")
	cmd.PersistentFlags().IntVar(&args.dps, "dps", args.dps, "number of dataplanes to emulate")
	cmd.PersistentFlags().IntVar(&args.services, "services", args.services, "number of services")
	cmd.PersistentFlags().IntVar(&args.inbounds, "inbounds", args.inbounds, "number of inbounds")
	cmd.PersistentFlags().IntVar(&args.outbounds, "outbounds", args.outbounds, "number of outbounds")
	cmd.PersistentFlags().DurationVar(&args.rampUpPeriod, "rampup-period", args.rampUpPeriod, "ramp up period")
	return cmd
}