in pkg/controller/service/pingmesh.go [41:115]
func (c *controller) dispatchPingTask(ctx context.Context, src, dst NodeInfo, taskGroup *sync.WaitGroup, latencyResult chan<- *Latency) error {
taskID := strconv.Itoa(int(getTaskIdx()))
pingInfo := &rpc.PingInfo{}
var err error
switch src.Type {
case "Pod":
pingInfo.Pod, pingInfo.Node, _, err = c.getPodInfo(ctx, src.Namespace, src.Name)
if err != nil {
return err
}
case "Node":
src.Nodename = src.Name
pingInfo.Node, _, err = c.getNodeInfo(ctx, src.Name)
if err != nil {
return err
}
case "IP":
return fmt.Errorf("not support ip as source")
}
switch dst.Type {
case "Pod":
_, _, pingInfo.Destination, err = c.getPodInfo(ctx, dst.Namespace, dst.Name)
if err != nil {
return err
}
case "Node":
_, pingInfo.Destination, err = c.getNodeInfo(ctx, dst.Name)
if err != nil {
return err
}
case "IP":
pingInfo.Destination = dst.Name
}
_, err = c.commitTask(src.Nodename, &rpc.Task{
Type: rpc.TaskType_Ping,
Id: taskID,
TaskInfo: &rpc.Task_Ping{
Ping: pingInfo,
},
})
if err != nil {
return err
}
taskGroup.Add(1)
go func() {
defer taskGroup.Done()
result, err := c.waitTaskResult(ctx, taskID)
if err != nil || !result.Success {
if err != nil {
log.Errorf("wait task result error: %v", err)
} else {
log.Errorf("wait task result error result: %+v", result.Message)
}
latencyResult <- &Latency{
Source: &src,
Target: &dst,
LatencyAvg: 9999.9,
LatencyMax: 9999.9,
LatencyMin: 9999.9,
}
return
}
if pingResult := result.GetPing(); pingResult != nil {
latencyResult <- &Latency{
Source: &src,
Target: &dst,
LatencyAvg: float64(pingResult.GetAvg()),
LatencyMax: float64(pingResult.GetMax()),
LatencyMin: float64(pingResult.GetMin()),
}
}
}()
return nil
}