func()

in ptp/ptp4u/server/server.go [255:303]


func (s *Server) handleEventMessages(eventConn *net.UDPConn) {
	buf := make([]byte, timestamp.PayloadSizeBytes)
	oob := make([]byte, timestamp.ControlSizeBytes)
	dReq := &ptp.SyncDelayReq{}
	// Initialize the new random. We will re-seed it every time in findWorker
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	var msgType ptp.MessageType
	var worker *sendWorker
	var sc *SubscriptionClient

	for {
		bbuf, clisa, rxTS, err := timestamp.ReadPacketWithRXTimestampBuf(s.eFd, buf, oob)
		if err != nil {
			log.Errorf("Failed to read packet on %s: %v", eventConn.LocalAddr(), err)
			continue
		}
		if s.Config.TimestampType != timestamp.HWTIMESTAMP {
			rxTS = rxTS.Add(s.Config.UTCOffset)
		}

		msgType, err = ptp.ProbeMsgType(buf[:bbuf])
		if err != nil {
			log.Errorf("Failed to probe the ptp message type: %v", err)
			continue
		}

		s.Stats.IncRX(msgType)

		switch msgType {
		case ptp.MessageDelayReq:
			if err := ptp.FromBytes(buf[:bbuf], dReq); err != nil {
				log.Errorf("Failed to read the ptp SyncDelayReq: %v", err)
				continue
			}

			log.Debugf("Got delay request")
			worker = s.findWorker(dReq.Header.SourcePortIdentity, r)
			sc = worker.FindSubscription(dReq.Header.SourcePortIdentity, ptp.MessageDelayResp)
			if sc == nil {
				log.Infof("Delay request from %s is not in the subscription list", timestamp.SockaddrToIP(clisa))
				continue
			}
			sc.UpdateDelayResp(&dReq.Header, rxTS)
			sc.Once()
		default:
			log.Errorf("Got unsupported message type %s(%d)", msgType, msgType)
		}
	}
}