func()

in ptp/ptp4u/server/server.go [306:398]


func (s *Server) handleGeneralMessages(generalConn *net.UDPConn) {
	buf := make([]byte, timestamp.PayloadSizeBytes)
	signaling := &ptp.Signaling{}
	zerotlv := []ptp.TLV{}
	// Initialize the new random. We will re-seed it every time in findWorker
	r := rand.New(rand.NewSource(time.Now().UnixNano()))

	var grantType ptp.MessageType
	var durationt time.Duration
	var intervalt time.Duration
	var expire time.Time
	var worker *sendWorker
	var sc *SubscriptionClient

	for {
		bbuf, gclisa, err := readPacketBuf(s.gFd, buf)
		if err != nil {
			log.Errorf("Failed to read packet on %s: %v", generalConn.LocalAddr(), err)
			continue
		}

		msgType, err := ptp.ProbeMsgType(buf[:bbuf])
		if err != nil {
			log.Errorf("Failed to probe the ptp message type: %v", err)
			continue
		}

		switch msgType {
		case ptp.MessageSignaling:
			signaling.TLVs = zerotlv
			if err := ptp.FromBytes(buf[:bbuf], signaling); err != nil {
				log.Error(err)
				continue
			}

			for _, tlv := range signaling.TLVs {
				switch v := tlv.(type) {
				case *ptp.RequestUnicastTransmissionTLV:
					grantType = v.MsgTypeAndReserved.MsgType()
					log.Debugf("Got %s grant request", grantType)
					durationt = time.Duration(v.DurationField) * time.Second
					expire = time.Now().Add(durationt)
					intervalt = v.LogInterMessagePeriod.Duration()

					switch grantType {
					case ptp.MessageAnnounce, ptp.MessageSync, ptp.MessageDelayResp:
						worker = s.findWorker(signaling.SourcePortIdentity, r)
						sc = worker.FindSubscription(signaling.SourcePortIdentity, grantType)
						if sc == nil {
							ip := timestamp.SockaddrToIP(gclisa)
							eclisa := timestamp.IPToSockaddr(ip, ptp.PortEvent)
							sc = NewSubscriptionClient(worker.queue, eclisa, gclisa, grantType, s.Config, intervalt, expire)
							worker.RegisterSubscription(signaling.SourcePortIdentity, grantType, sc)
						} else {
							// Update existing subscription data
							sc.expire = expire
							sc.interval = intervalt
							// Update gclisa in case of renewal. This is against the standard,
							// but we want to be able to respond to DelayResps coming from ephemeral ports
							sc.gclisa = gclisa
						}

						// Reject queries out of limit
						if intervalt < s.Config.MinSubInterval || durationt > s.Config.MaxSubDuration || s.ctx.Err() != nil {
							s.sendGrant(sc, signaling, v.MsgTypeAndReserved, v.LogInterMessagePeriod, 0, gclisa)
							continue
						}

						if !sc.Running() {
							go sc.Start(s.ctx)
						}

						// Send confirmation grant
						s.sendGrant(sc, signaling, v.MsgTypeAndReserved, v.LogInterMessagePeriod, v.DurationField, gclisa)
					default:
						log.Errorf("Got unsupported grant type %s", grantType)
					}
					s.Stats.IncRXSignaling(grantType)
				case *ptp.CancelUnicastTransmissionTLV:
					grantType = v.MsgTypeAndFlags.MsgType()
					log.Debugf("Got %s cancel request", grantType)
					worker = s.findWorker(signaling.SourcePortIdentity, r)
					sc = worker.FindSubscription(signaling.SourcePortIdentity, grantType)
					if sc != nil {
						sc.Stop()
					}
				default:
					log.Errorf("Got unsupported message type %s(%d)", msgType, msgType)
				}
			}
		}
	}
}