func()

in ptp/ptp4u/server/worker.go [144:251]


func (s *sendWorker) Start() {
	eFd, gFd, err := s.listen()
	if err != nil {
		log.Fatal(err)
	}
	defer unix.Close(eFd)
	defer unix.Close(gFd)

	// reusable buffers
	buf := make([]byte, timestamp.PayloadSizeBytes)
	oob := make([]byte, timestamp.ControlSizeBytes)

	// TMP buffers
	toob := make([]byte, timestamp.ControlSizeBytes)

	var (
		n        int
		attempts int
		txTS     time.Time
		c        *SubscriptionClient
	)

	for c = range s.queue {
		switch c.subscriptionType {
		case ptp.MessageSync:
			// send sync
			c.UpdateSync()
			n, err = ptp.BytesTo(c.Sync(), buf)
			if err != nil {
				log.Errorf("Failed to generate the sync packet: %v", err)
				continue
			}
			log.Debugf("Sending sync")

			err = unix.Sendto(eFd, buf[:n], 0, c.eclisa)
			if err != nil {
				log.Errorf("Failed to send the sync packet: %v", err)
				continue
			}
			s.stats.IncTX(c.subscriptionType)

			txTS, attempts, err = timestamp.ReadTXtimestampBuf(eFd, oob, toob)
			s.stats.SetMaxTXTSAttempts(s.id, int64(attempts))
			if err != nil {
				log.Warningf("Failed to read TX timestamp: %v", err)
				continue
			}
			if s.config.TimestampType != timestamp.HWTIMESTAMP {
				txTS = txTS.Add(s.config.UTCOffset)
			}

			// send followup
			c.UpdateFollowup(txTS)
			n, err = ptp.BytesTo(c.Followup(), buf)
			if err != nil {
				log.Errorf("Failed to generate the followup packet: %v", err)
				continue
			}
			log.Debugf("Sending followup")

			err = unix.Sendto(gFd, buf[:n], 0, c.gclisa)
			if err != nil {
				log.Errorf("Failed to send the followup packet: %v", err)
				continue
			}
			s.stats.IncTX(ptp.MessageFollowUp)
		case ptp.MessageAnnounce:
			// send announce
			c.UpdateAnnounce()
			n, err = ptp.BytesTo(c.Announce(), buf)
			if err != nil {
				log.Errorf("Failed to prepare the announce packet: %v", err)
				continue
			}
			log.Debugf("Sending announce")

			err = unix.Sendto(gFd, buf[:n], 0, c.gclisa)
			if err != nil {
				log.Errorf("Failed to send the announce packet: %v", err)
				continue
			}
			s.stats.IncTX(c.subscriptionType)

		case ptp.MessageDelayResp:
			// send delay response
			n, err = ptp.BytesTo(c.DelayResp(), buf)
			if err != nil {
				log.Errorf("Failed to prepare the delay response packet: %v", err)
				continue
			}
			log.Debugf("Sending delay response")

			err = unix.Sendto(gFd, buf[:n], 0, c.gclisa)
			if err != nil {
				log.Errorf("Failed to send the delay response: %v", err)
				continue
			}
			s.stats.IncTX(c.subscriptionType)

		default:
			log.Errorf("Unknown subscription type: %v", c.subscriptionType)
			continue
		}

		c.IncSequenceID()
		s.stats.SetMaxWorkerQueue(s.id, int64(len(s.queue)))
	}
}