in ptp/ptp4u/server/server.go [306:398]
func (s *Server) handleGeneralMessages(generalConn *net.UDPConn) {
buf := make([]byte, timestamp.PayloadSizeBytes)
signaling := &ptp.Signaling{}
zerotlv := []ptp.TLV{}
// Initialize the new random. We will re-seed it every time in findWorker
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var grantType ptp.MessageType
var durationt time.Duration
var intervalt time.Duration
var expire time.Time
var worker *sendWorker
var sc *SubscriptionClient
for {
bbuf, gclisa, err := readPacketBuf(s.gFd, buf)
if err != nil {
log.Errorf("Failed to read packet on %s: %v", generalConn.LocalAddr(), err)
continue
}
msgType, err := ptp.ProbeMsgType(buf[:bbuf])
if err != nil {
log.Errorf("Failed to probe the ptp message type: %v", err)
continue
}
switch msgType {
case ptp.MessageSignaling:
signaling.TLVs = zerotlv
if err := ptp.FromBytes(buf[:bbuf], signaling); err != nil {
log.Error(err)
continue
}
for _, tlv := range signaling.TLVs {
switch v := tlv.(type) {
case *ptp.RequestUnicastTransmissionTLV:
grantType = v.MsgTypeAndReserved.MsgType()
log.Debugf("Got %s grant request", grantType)
durationt = time.Duration(v.DurationField) * time.Second
expire = time.Now().Add(durationt)
intervalt = v.LogInterMessagePeriod.Duration()
switch grantType {
case ptp.MessageAnnounce, ptp.MessageSync, ptp.MessageDelayResp:
worker = s.findWorker(signaling.SourcePortIdentity, r)
sc = worker.FindSubscription(signaling.SourcePortIdentity, grantType)
if sc == nil {
ip := timestamp.SockaddrToIP(gclisa)
eclisa := timestamp.IPToSockaddr(ip, ptp.PortEvent)
sc = NewSubscriptionClient(worker.queue, eclisa, gclisa, grantType, s.Config, intervalt, expire)
worker.RegisterSubscription(signaling.SourcePortIdentity, grantType, sc)
} else {
// Update existing subscription data
sc.expire = expire
sc.interval = intervalt
// Update gclisa in case of renewal. This is against the standard,
// but we want to be able to respond to DelayResps coming from ephemeral ports
sc.gclisa = gclisa
}
// Reject queries out of limit
if intervalt < s.Config.MinSubInterval || durationt > s.Config.MaxSubDuration || s.ctx.Err() != nil {
s.sendGrant(sc, signaling, v.MsgTypeAndReserved, v.LogInterMessagePeriod, 0, gclisa)
continue
}
if !sc.Running() {
go sc.Start(s.ctx)
}
// Send confirmation grant
s.sendGrant(sc, signaling, v.MsgTypeAndReserved, v.LogInterMessagePeriod, v.DurationField, gclisa)
default:
log.Errorf("Got unsupported grant type %s", grantType)
}
s.Stats.IncRXSignaling(grantType)
case *ptp.CancelUnicastTransmissionTLV:
grantType = v.MsgTypeAndFlags.MsgType()
log.Debugf("Got %s cancel request", grantType)
worker = s.findWorker(signaling.SourcePortIdentity, r)
sc = worker.FindSubscription(signaling.SourcePortIdentity, grantType)
if sc != nil {
sc.Stop()
}
default:
log.Errorf("Got unsupported message type %s(%d)", msgType, msgType)
}
}
}
}
}