ptp/ptp4u/server/server.go (327 lines of code) (raw):
/*
Copyright (c) Facebook, Inc. and its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package server implements simple UDP server to work with NTP packets.
In addition, it run checker, announce and stats implementations
*/
package server
import (
"context"
"fmt"
"math/rand"
"net"
"sync"
"time"
ptp "github.com/facebook/time/ptp/protocol"
"github.com/facebook/time/ptp/ptp4u/drain"
"github.com/facebook/time/ptp/ptp4u/stats"
"github.com/facebook/time/timestamp"
log "github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
// Server is PTP unicast server
type Server struct {
Config *Config
Stats stats.Stats
Checks []drain.Drain
sw []*sendWorker
// server source fds
eFd int
gFd int
// drain logic
cancel context.CancelFunc
ctx context.Context
}
// Start the workers send bind to event and general UDP ports
func (s *Server) Start() error {
// Set clock identity
iface, err := net.InterfaceByName(s.Config.Interface)
if err != nil {
return fmt.Errorf("unable to get mac address of the interface: %v", err)
}
s.Config.clockIdentity, err = ptp.NewClockIdentity(iface.HardwareAddr)
if err != nil {
return fmt.Errorf("unable to get the Clock Identity (EUI-64 address) of the interface: %v", err)
}
// initialize the context for the subscriptions
s.ctx, s.cancel = context.WithCancel(context.Background())
// Call wg.Add(1) ONLY once
// If ANY goroutine finishes no matter how many of them we run
// wg.Done will unblock
var wg sync.WaitGroup
wg.Add(1)
// start X workers
s.sw = make([]*sendWorker, s.Config.SendWorkers)
for i := 0; i < s.Config.SendWorkers; i++ {
// Each worker to monitor own queue
s.sw[i] = newSendWorker(i, s.Config, s.Stats)
go func(i int) {
defer wg.Done()
s.sw[i].Start()
}(i)
}
go func() {
defer wg.Done()
s.startGeneralListener()
}()
go func() {
defer wg.Done()
s.startEventListener()
}()
// Run active metric reporting
go func() {
defer wg.Done()
for {
<-time.After(s.Config.MetricInterval)
for _, w := range s.sw {
w.inventoryClients()
}
s.Stats.SetUTCOffset(int64(s.Config.UTCOffset.Seconds()))
s.Stats.Snapshot()
s.Stats.Reset()
}
}()
// Drain check
go func() {
defer wg.Done()
ticker := time.NewTicker(s.Config.DrainInterval)
defer ticker.Stop()
for ; true; <-ticker.C {
var drain bool
for _, check := range s.Checks {
if check.Check() {
drain = true
log.Warningf("%T engaged shifting traffic", check)
break
}
}
if drain {
s.Drain()
s.Stats.SetDrain(1)
} else {
s.Undrain()
s.Stats.SetDrain(0)
}
}
}()
// Update UTC offset periodically
go func() {
defer wg.Done()
for {
<-time.After(1 * time.Minute)
if s.Config.Leapsectz {
if err := s.Config.SetUTCOffsetFromLeapsectz(); err != nil {
log.Errorf("Failed to update UTC offset: %v. Keeping the last known: %s", err, s.Config.UTCOffset)
}
} else if s.Config.SHM {
if err := s.Config.SetUTCOffsetFromSHM(); err != nil {
log.Errorf("Failed to update UTC offset: %v. Keeping the last known: %s", err, s.Config.UTCOffset)
}
}
log.Debugf("UTC offset is: %v", s.Config.UTCOffset.Seconds())
}
}()
// Wait for ANY gorouine to finish
wg.Wait()
return fmt.Errorf("one of server routines finished")
}
// startEventListener launches the listener which listens to subscription requests
func (s *Server) startEventListener() {
var err error
log.Infof("Binding on %s %d", s.Config.IP, ptp.PortEvent)
eventConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: s.Config.IP, Port: ptp.PortEvent})
if err != nil {
log.Fatalf("Listening error: %s", err)
}
defer eventConn.Close()
// get connection file descriptor
s.eFd, err = timestamp.ConnFd(eventConn)
if err != nil {
log.Fatalf("Getting event connection FD: %s", err)
}
// Enable RX timestamps. Delay requests need to be timestamped by ptp4u on receipt
switch s.Config.TimestampType {
case timestamp.HWTIMESTAMP:
if err := timestamp.EnableHWTimestamps(s.eFd, s.Config.Interface); err != nil {
log.Fatalf("Cannot enable hardware RX timestamps: %v", err)
}
case timestamp.SWTIMESTAMP:
if err := timestamp.EnableSWTimestamps(s.eFd); err != nil {
log.Fatalf("Cannot enable software RX timestamps: %v", err)
}
default:
log.Fatalf("Unrecognized timestamp type: %s", s.Config.TimestampType)
}
err = unix.SetNonblock(s.eFd, false)
if err != nil {
log.Fatalf("Failed to set socket to blocking: %s", err)
}
// Call wg.Add(1) ONLY once
// If ANY goroutine finishes no matter how many of them we run
// wg.Done will unblock
var wg sync.WaitGroup
wg.Add(1)
for i := 0; i < s.Config.RecvWorkers; i++ {
go func() {
defer wg.Done()
s.handleEventMessages(eventConn)
}()
}
wg.Wait()
}
// startGeneralListener launches the listener which listens to announces
func (s *Server) startGeneralListener() {
var err error
log.Infof("Binding on %s %d", s.Config.IP, ptp.PortGeneral)
generalConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: s.Config.IP, Port: ptp.PortGeneral})
if err != nil {
log.Fatalf("Listening error: %s", err)
}
defer generalConn.Close()
// get connection file descriptor
s.gFd, err = timestamp.ConnFd(generalConn)
if err != nil {
log.Fatalf("Getting general connection FD: %s", err)
}
err = unix.SetNonblock(s.gFd, false)
if err != nil {
log.Fatalf("Failed to set socket to blocking: %s", err)
}
// Call wg.Add(1) ONLY once
// If ANY goroutine finishes no matter how many of them we run
// wg.Done will unblock
var wg sync.WaitGroup
wg.Add(1)
for i := 0; i < s.Config.RecvWorkers; i++ {
go func() {
defer wg.Done()
s.handleGeneralMessages(generalConn)
}()
}
wg.Wait()
}
func readPacketBuf(connFd int, buf []byte) (int, unix.Sockaddr, error) {
n, saddr, err := unix.Recvfrom(connFd, buf, 0)
if err != nil {
return 0, nil, err
}
return n, saddr, err
}
// handleEventMessage is a handler which gets called every time Event Message arrives
func (s *Server) handleEventMessages(eventConn *net.UDPConn) {
buf := make([]byte, timestamp.PayloadSizeBytes)
oob := make([]byte, timestamp.ControlSizeBytes)
dReq := &ptp.SyncDelayReq{}
// Initialize the new random. We will re-seed it every time in findWorker
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var msgType ptp.MessageType
var worker *sendWorker
var sc *SubscriptionClient
for {
bbuf, clisa, rxTS, err := timestamp.ReadPacketWithRXTimestampBuf(s.eFd, buf, oob)
if err != nil {
log.Errorf("Failed to read packet on %s: %v", eventConn.LocalAddr(), err)
continue
}
if s.Config.TimestampType != timestamp.HWTIMESTAMP {
rxTS = rxTS.Add(s.Config.UTCOffset)
}
msgType, err = ptp.ProbeMsgType(buf[:bbuf])
if err != nil {
log.Errorf("Failed to probe the ptp message type: %v", err)
continue
}
s.Stats.IncRX(msgType)
switch msgType {
case ptp.MessageDelayReq:
if err := ptp.FromBytes(buf[:bbuf], dReq); err != nil {
log.Errorf("Failed to read the ptp SyncDelayReq: %v", err)
continue
}
log.Debugf("Got delay request")
worker = s.findWorker(dReq.Header.SourcePortIdentity, r)
sc = worker.FindSubscription(dReq.Header.SourcePortIdentity, ptp.MessageDelayResp)
if sc == nil {
log.Infof("Delay request from %s is not in the subscription list", timestamp.SockaddrToIP(clisa))
continue
}
sc.UpdateDelayResp(&dReq.Header, rxTS)
sc.Once()
default:
log.Errorf("Got unsupported message type %s(%d)", msgType, msgType)
}
}
}
// handleGeneralMessage is a handler which gets called every time General Message arrives
func (s *Server) handleGeneralMessages(generalConn *net.UDPConn) {
buf := make([]byte, timestamp.PayloadSizeBytes)
signaling := &ptp.Signaling{}
zerotlv := []ptp.TLV{}
// Initialize the new random. We will re-seed it every time in findWorker
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var grantType ptp.MessageType
var durationt time.Duration
var intervalt time.Duration
var expire time.Time
var worker *sendWorker
var sc *SubscriptionClient
for {
bbuf, gclisa, err := readPacketBuf(s.gFd, buf)
if err != nil {
log.Errorf("Failed to read packet on %s: %v", generalConn.LocalAddr(), err)
continue
}
msgType, err := ptp.ProbeMsgType(buf[:bbuf])
if err != nil {
log.Errorf("Failed to probe the ptp message type: %v", err)
continue
}
switch msgType {
case ptp.MessageSignaling:
signaling.TLVs = zerotlv
if err := ptp.FromBytes(buf[:bbuf], signaling); err != nil {
log.Error(err)
continue
}
for _, tlv := range signaling.TLVs {
switch v := tlv.(type) {
case *ptp.RequestUnicastTransmissionTLV:
grantType = v.MsgTypeAndReserved.MsgType()
log.Debugf("Got %s grant request", grantType)
durationt = time.Duration(v.DurationField) * time.Second
expire = time.Now().Add(durationt)
intervalt = v.LogInterMessagePeriod.Duration()
switch grantType {
case ptp.MessageAnnounce, ptp.MessageSync, ptp.MessageDelayResp:
worker = s.findWorker(signaling.SourcePortIdentity, r)
sc = worker.FindSubscription(signaling.SourcePortIdentity, grantType)
if sc == nil {
ip := timestamp.SockaddrToIP(gclisa)
eclisa := timestamp.IPToSockaddr(ip, ptp.PortEvent)
sc = NewSubscriptionClient(worker.queue, eclisa, gclisa, grantType, s.Config, intervalt, expire)
worker.RegisterSubscription(signaling.SourcePortIdentity, grantType, sc)
} else {
// Update existing subscription data
sc.expire = expire
sc.interval = intervalt
// Update gclisa in case of renewal. This is against the standard,
// but we want to be able to respond to DelayResps coming from ephemeral ports
sc.gclisa = gclisa
}
// Reject queries out of limit
if intervalt < s.Config.MinSubInterval || durationt > s.Config.MaxSubDuration || s.ctx.Err() != nil {
s.sendGrant(sc, signaling, v.MsgTypeAndReserved, v.LogInterMessagePeriod, 0, gclisa)
continue
}
if !sc.Running() {
go sc.Start(s.ctx)
}
// Send confirmation grant
s.sendGrant(sc, signaling, v.MsgTypeAndReserved, v.LogInterMessagePeriod, v.DurationField, gclisa)
default:
log.Errorf("Got unsupported grant type %s", grantType)
}
s.Stats.IncRXSignaling(grantType)
case *ptp.CancelUnicastTransmissionTLV:
grantType = v.MsgTypeAndFlags.MsgType()
log.Debugf("Got %s cancel request", grantType)
worker = s.findWorker(signaling.SourcePortIdentity, r)
sc = worker.FindSubscription(signaling.SourcePortIdentity, grantType)
if sc != nil {
sc.Stop()
}
default:
log.Errorf("Got unsupported message type %s(%d)", msgType, msgType)
}
}
}
}
}
func (s *Server) findWorker(clientID ptp.PortIdentity, r *rand.Rand) *sendWorker {
// Seeding random with the same value will produce the same number
r.Seed(int64(clientID.ClockIdentity) + int64(clientID.PortNumber))
return s.sw[r.Intn(s.Config.SendWorkers)]
}
// sendGrant sends a Unicast Grant message
func (s *Server) sendGrant(sc *SubscriptionClient, sg *ptp.Signaling, mt ptp.UnicastMsgTypeAndFlags, interval ptp.LogInterval, duration uint32, sa unix.Sockaddr) {
sc.UpdateGrant(sg, mt, interval, duration)
grantb, err := ptp.Bytes(sc.Grant())
if err != nil {
log.Errorf("Failed to prepare the unicast grant: %v", err)
return
}
err = unix.Sendto(s.gFd, grantb, 0, sa)
if err != nil {
log.Errorf("Failed to send the unicast grant: %v", err)
return
}
log.Debugf("Sent unicast grant")
s.Stats.IncTXSignaling(sc.subscriptionType)
}
// Drain traffic
func (s *Server) Drain() {
if s.ctx != nil && s.ctx.Err() == nil {
s.cancel()
}
}
// Undrain traffic
func (s *Server) Undrain() {
if s.ctx != nil && s.ctx.Err() != nil {
s.ctx, s.cancel = context.WithCancel(context.Background())
}
}