func()

in pkg/controller/service/pingmesh.go [41:115]


func (c *controller) dispatchPingTask(ctx context.Context, src, dst NodeInfo, taskGroup *sync.WaitGroup, latencyResult chan<- *Latency) error {
	taskID := strconv.Itoa(int(getTaskIdx()))
	pingInfo := &rpc.PingInfo{}
	var err error
	switch src.Type {
	case "Pod":
		pingInfo.Pod, pingInfo.Node, _, err = c.getPodInfo(ctx, src.Namespace, src.Name)
		if err != nil {
			return err
		}
	case "Node":
		src.Nodename = src.Name
		pingInfo.Node, _, err = c.getNodeInfo(ctx, src.Name)
		if err != nil {
			return err
		}
	case "IP":
		return fmt.Errorf("not support ip as source")
	}
	switch dst.Type {
	case "Pod":
		_, _, pingInfo.Destination, err = c.getPodInfo(ctx, dst.Namespace, dst.Name)
		if err != nil {
			return err
		}
	case "Node":
		_, pingInfo.Destination, err = c.getNodeInfo(ctx, dst.Name)
		if err != nil {
			return err
		}
	case "IP":
		pingInfo.Destination = dst.Name
	}

	_, err = c.commitTask(src.Nodename, &rpc.Task{
		Type: rpc.TaskType_Ping,
		Id:   taskID,
		TaskInfo: &rpc.Task_Ping{
			Ping: pingInfo,
		},
	})
	if err != nil {
		return err
	}
	taskGroup.Add(1)
	go func() {
		defer taskGroup.Done()
		result, err := c.waitTaskResult(ctx, taskID)
		if err != nil || !result.Success {
			if err != nil {
				log.Errorf("wait task result error: %v", err)
			} else {
				log.Errorf("wait task result error result: %+v", result.Message)
			}
			latencyResult <- &Latency{
				Source:     &src,
				Target:     &dst,
				LatencyAvg: 9999.9,
				LatencyMax: 9999.9,
				LatencyMin: 9999.9,
			}
			return
		}
		if pingResult := result.GetPing(); pingResult != nil {
			latencyResult <- &Latency{
				Source:     &src,
				Target:     &dst,
				LatencyAvg: float64(pingResult.GetAvg()),
				LatencyMax: float64(pingResult.GetMax()),
				LatencyMin: float64(pingResult.GetMin()),
			}
		}
	}()
	return nil
}