in ptp/ptp4u/server/worker.go [144:251]
func (s *sendWorker) Start() {
eFd, gFd, err := s.listen()
if err != nil {
log.Fatal(err)
}
defer unix.Close(eFd)
defer unix.Close(gFd)
// reusable buffers
buf := make([]byte, timestamp.PayloadSizeBytes)
oob := make([]byte, timestamp.ControlSizeBytes)
// TMP buffers
toob := make([]byte, timestamp.ControlSizeBytes)
var (
n int
attempts int
txTS time.Time
c *SubscriptionClient
)
for c = range s.queue {
switch c.subscriptionType {
case ptp.MessageSync:
// send sync
c.UpdateSync()
n, err = ptp.BytesTo(c.Sync(), buf)
if err != nil {
log.Errorf("Failed to generate the sync packet: %v", err)
continue
}
log.Debugf("Sending sync")
err = unix.Sendto(eFd, buf[:n], 0, c.eclisa)
if err != nil {
log.Errorf("Failed to send the sync packet: %v", err)
continue
}
s.stats.IncTX(c.subscriptionType)
txTS, attempts, err = timestamp.ReadTXtimestampBuf(eFd, oob, toob)
s.stats.SetMaxTXTSAttempts(s.id, int64(attempts))
if err != nil {
log.Warningf("Failed to read TX timestamp: %v", err)
continue
}
if s.config.TimestampType != timestamp.HWTIMESTAMP {
txTS = txTS.Add(s.config.UTCOffset)
}
// send followup
c.UpdateFollowup(txTS)
n, err = ptp.BytesTo(c.Followup(), buf)
if err != nil {
log.Errorf("Failed to generate the followup packet: %v", err)
continue
}
log.Debugf("Sending followup")
err = unix.Sendto(gFd, buf[:n], 0, c.gclisa)
if err != nil {
log.Errorf("Failed to send the followup packet: %v", err)
continue
}
s.stats.IncTX(ptp.MessageFollowUp)
case ptp.MessageAnnounce:
// send announce
c.UpdateAnnounce()
n, err = ptp.BytesTo(c.Announce(), buf)
if err != nil {
log.Errorf("Failed to prepare the announce packet: %v", err)
continue
}
log.Debugf("Sending announce")
err = unix.Sendto(gFd, buf[:n], 0, c.gclisa)
if err != nil {
log.Errorf("Failed to send the announce packet: %v", err)
continue
}
s.stats.IncTX(c.subscriptionType)
case ptp.MessageDelayResp:
// send delay response
n, err = ptp.BytesTo(c.DelayResp(), buf)
if err != nil {
log.Errorf("Failed to prepare the delay response packet: %v", err)
continue
}
log.Debugf("Sending delay response")
err = unix.Sendto(gFd, buf[:n], 0, c.gclisa)
if err != nil {
log.Errorf("Failed to send the delay response: %v", err)
continue
}
s.stats.IncTX(c.subscriptionType)
default:
log.Errorf("Unknown subscription type: %v", c.subscriptionType)
continue
}
c.IncSequenceID()
s.stats.SetMaxWorkerQueue(s.id, int64(len(s.queue)))
}
}