ptp/ptp4u/server/subscription.go (281 lines of code) (raw):
/*
Copyright (c) Facebook, Inc. and its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package server implements simple UDP server to work with NTP packets.
In addition, it run checker, announce and stats implementations
*/
package server
import (
"context"
"encoding/binary"
"fmt"
"sync"
"time"
ptp "github.com/facebook/time/ptp/protocol"
"github.com/facebook/time/timestamp"
log "github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
// SubscriptionClient is sending subscriptionType messages periodically
type SubscriptionClient struct {
sync.Mutex
queue chan *SubscriptionClient
subscriptionType ptp.MessageType
serverConfig *Config
interval time.Duration
expire time.Time
sequenceID uint16
running bool
// socket addresses
eclisa unix.Sockaddr
gclisa unix.Sockaddr
// packets
syncP *ptp.SyncDelayReq
followupP *ptp.FollowUp
announceP *ptp.Announce
delayRespP *ptp.DelayResp
grant *ptp.Signaling
}
// NewSubscriptionClient gets minimal required arguments to create a subscription
func NewSubscriptionClient(q chan *SubscriptionClient, eclisa, gclisa unix.Sockaddr, st ptp.MessageType, sc *Config, i time.Duration, e time.Time) *SubscriptionClient {
s := &SubscriptionClient{
eclisa: eclisa,
gclisa: gclisa,
subscriptionType: st,
interval: i,
expire: e,
queue: q,
serverConfig: sc,
}
s.initSync()
s.initFollowup()
s.initAnnounce()
s.initDelayResp()
s.initGrant()
return s
}
// Start launches the subscription timers and exit on expire
func (sc *SubscriptionClient) Start(ctx context.Context) {
log.Infof("Starting a new %s subscription for %s", sc.subscriptionType, timestamp.SockaddrToIP(sc.eclisa))
sc.setRunning(true)
over := fmt.Sprintf("Subscription %s is over for %s", sc.subscriptionType, timestamp.SockaddrToIP(sc.eclisa))
// Send first message right away
if sc.subscriptionType != ptp.MessageDelayResp {
sc.Once()
}
intervalTicker := time.NewTicker(sc.interval)
oldInterval := sc.interval
defer intervalTicker.Stop()
defer sc.setRunning(false)
for {
select {
case <-ctx.Done():
log.Infof(over)
// TODO send cancellation
return
case <-intervalTicker.C:
if sc.Expired() {
log.Infof(over)
// TODO send cancellation
return
}
// check if interval changed, maybe update our ticker
if oldInterval != sc.interval {
intervalTicker.Reset(sc.interval)
oldInterval = sc.interval
}
if sc.subscriptionType != ptp.MessageDelayResp {
// Add myself to the worker queue
sc.Once()
}
}
}
}
// Once adds itself to the worker queue once
func (sc *SubscriptionClient) Once() {
sc.queue <- sc
}
// Expired checks if the subscription expired or not
func (sc *SubscriptionClient) Expired() bool {
sc.Lock()
defer sc.Unlock()
return time.Now().After(sc.expire)
}
// Stop stops the subscription
func (sc *SubscriptionClient) Stop() {
sc.Lock()
defer sc.Unlock()
// Simply set the expiration time and subscription will be stopped
sc.expire = time.Now()
}
// setRunning atomically sets running
func (sc *SubscriptionClient) setRunning(running bool) {
sc.Lock()
defer sc.Unlock()
sc.running = running
}
// setExpire atomically sets expire
func (sc *SubscriptionClient) setExpire(expire time.Time) {
sc.Lock()
defer sc.Unlock()
sc.expire = expire
}
// setInterval atomically sets interval
func (sc *SubscriptionClient) setInterval(interval time.Duration) {
sc.Lock()
defer sc.Unlock()
sc.interval = interval
}
// Running returns the running bool
func (sc *SubscriptionClient) Running() bool {
sc.Lock()
defer sc.Unlock()
return sc.running
}
// IncSequenceID adds 1 to a sequence id
func (sc *SubscriptionClient) IncSequenceID() {
sc.sequenceID++
}
func (sc *SubscriptionClient) initSync() {
sc.syncP = &ptp.SyncDelayReq{
Header: ptp.Header{
SdoIDAndMsgType: ptp.NewSdoIDAndMsgType(ptp.MessageSync, 0),
Version: ptp.Version,
MessageLength: uint16(binary.Size(ptp.SyncDelayReq{})),
DomainNumber: 0,
FlagField: ptp.FlagUnicast | ptp.FlagTwoStep,
SequenceID: 0,
SourcePortIdentity: ptp.PortIdentity{
PortNumber: 1,
ClockIdentity: sc.serverConfig.clockIdentity,
},
LogMessageInterval: 0x7f,
ControlField: 0,
},
}
}
// UpdateSync updates ptp Sync packet
func (sc *SubscriptionClient) UpdateSync() {
sc.syncP.SequenceID = sc.sequenceID
}
// Sync returns ptp Sync packet
func (sc *SubscriptionClient) Sync() *ptp.SyncDelayReq {
return sc.syncP
}
func (sc *SubscriptionClient) initFollowup() {
sc.followupP = &ptp.FollowUp{
Header: ptp.Header{
SdoIDAndMsgType: ptp.NewSdoIDAndMsgType(ptp.MessageFollowUp, 0),
Version: ptp.Version,
MessageLength: uint16(binary.Size(ptp.FollowUp{})),
DomainNumber: 0,
FlagField: ptp.FlagUnicast,
SequenceID: 0,
SourcePortIdentity: ptp.PortIdentity{
PortNumber: 1,
ClockIdentity: sc.serverConfig.clockIdentity,
},
LogMessageInterval: 0,
ControlField: 2,
},
FollowUpBody: ptp.FollowUpBody{
PreciseOriginTimestamp: ptp.NewTimestamp(time.Now()),
},
}
}
// UpdateFollowup updates ptp Follow Up packet
func (sc *SubscriptionClient) UpdateFollowup(hwts time.Time) {
i, _ := ptp.NewLogInterval(sc.interval)
sc.followupP.SequenceID = sc.sequenceID
sc.followupP.LogMessageInterval = i
sc.followupP.PreciseOriginTimestamp = ptp.NewTimestamp(hwts)
}
// Followup returns ptp Follow Up packet
func (sc *SubscriptionClient) Followup() *ptp.FollowUp {
return sc.followupP
}
func (sc *SubscriptionClient) initAnnounce() {
sc.announceP = &ptp.Announce{
Header: ptp.Header{
SdoIDAndMsgType: ptp.NewSdoIDAndMsgType(ptp.MessageAnnounce, 0),
Version: ptp.Version,
MessageLength: uint16(binary.Size(ptp.Announce{})),
DomainNumber: 0,
FlagField: ptp.FlagUnicast | ptp.FlagPTPTimescale,
SequenceID: 0,
SourcePortIdentity: ptp.PortIdentity{
PortNumber: 1,
ClockIdentity: sc.serverConfig.clockIdentity,
},
LogMessageInterval: 0,
ControlField: 5,
},
AnnounceBody: ptp.AnnounceBody{
CurrentUTCOffset: 0,
Reserved: 0,
GrandmasterPriority1: 128,
GrandmasterClockQuality: ptp.ClockQuality{
ClockClass: 0,
ClockAccuracy: 0,
OffsetScaledLogVariance: 23008,
},
GrandmasterPriority2: 128,
GrandmasterIdentity: sc.serverConfig.clockIdentity,
StepsRemoved: 0,
TimeSource: ptp.TimeSourceGNSS,
},
}
}
// UpdateAnnounce updates ptp Announce packet
func (sc *SubscriptionClient) UpdateAnnounce() {
i, _ := ptp.NewLogInterval(sc.interval)
sc.announceP.SequenceID = sc.sequenceID
sc.announceP.LogMessageInterval = i
sc.announceP.CurrentUTCOffset = int16(sc.serverConfig.UTCOffset.Seconds())
sc.announceP.GrandmasterClockQuality.ClockClass = uint8(sc.serverConfig.ClockClass)
sc.announceP.GrandmasterClockQuality.ClockAccuracy = uint8(sc.serverConfig.ClockAccuracy)
}
// Announce returns ptp Announce packet
func (sc *SubscriptionClient) Announce() *ptp.Announce {
return sc.announceP
}
func (sc *SubscriptionClient) initDelayResp() {
sc.delayRespP = &ptp.DelayResp{
Header: ptp.Header{
SdoIDAndMsgType: ptp.NewSdoIDAndMsgType(ptp.MessageDelayResp, 0),
Version: ptp.Version,
MessageLength: uint16(binary.Size(ptp.DelayResp{})),
DomainNumber: 0,
FlagField: ptp.FlagUnicast,
SequenceID: 0,
SourcePortIdentity: ptp.PortIdentity{
PortNumber: 1,
ClockIdentity: sc.serverConfig.clockIdentity,
},
LogMessageInterval: 0x7f,
ControlField: 3,
CorrectionField: 0,
},
DelayRespBody: ptp.DelayRespBody{},
}
}
// UpdateDelayResp updates ptp Delay Response packet
func (sc *SubscriptionClient) UpdateDelayResp(h *ptp.Header, received time.Time) {
sc.delayRespP.SequenceID = h.SequenceID
sc.delayRespP.CorrectionField = h.CorrectionField
sc.delayRespP.DelayRespBody = ptp.DelayRespBody{
ReceiveTimestamp: ptp.NewTimestamp(received),
RequestingPortIdentity: h.SourcePortIdentity,
}
}
// DelayResp returns ptp Delay Response packet
func (sc *SubscriptionClient) DelayResp() *ptp.DelayResp {
return sc.delayRespP
}
func (sc *SubscriptionClient) initGrant() {
sc.grant = &ptp.Signaling{
Header: ptp.Header{
Version: ptp.Version,
MessageLength: uint16(binary.Size(ptp.Header{}) + binary.Size(ptp.PortIdentity{}) + binary.Size(ptp.GrantUnicastTransmissionTLV{})),
FlagField: ptp.FlagUnicast,
SourcePortIdentity: ptp.PortIdentity{
PortNumber: 1,
ClockIdentity: sc.serverConfig.clockIdentity,
},
},
TargetPortIdentity: ptp.PortIdentity{},
TLVs: []ptp.TLV{
&ptp.GrantUnicastTransmissionTLV{
TLVHead: ptp.TLVHead{TLVType: ptp.TLVGrantUnicastTransmission, LengthField: uint16(binary.Size(ptp.GrantUnicastTransmissionTLV{}) - binary.Size(ptp.TLVHead{}))},
Reserved: 0,
Renewal: 1,
},
},
}
}
// UpdateGrant updates ptp Signaling packet granting the requested subscription
func (sc *SubscriptionClient) UpdateGrant(sg *ptp.Signaling, mt ptp.UnicastMsgTypeAndFlags, interval ptp.LogInterval, duration uint32) {
sc.grant.Header.SdoIDAndMsgType = sg.Header.SdoIDAndMsgType
sc.grant.Header.DomainNumber = sg.Header.DomainNumber
sc.grant.Header.MinorSdoID = sg.Header.MinorSdoID
sc.grant.Header.CorrectionField = sg.Header.CorrectionField
sc.grant.Header.MessageTypeSpecific = sg.Header.MessageTypeSpecific
sc.grant.Header.SequenceID = sg.Header.SequenceID
sc.grant.Header.ControlField = sg.Header.ControlField
sc.grant.Header.LogMessageInterval = sg.Header.LogMessageInterval
sc.grant.TargetPortIdentity = sg.SourcePortIdentity
tlv := sc.grant.TLVs[0].(*ptp.GrantUnicastTransmissionTLV)
tlv.MsgTypeAndReserved = mt
tlv.LogInterMessagePeriod = interval
tlv.DurationField = duration
sc.grant.TLVs[0] = tlv
}
// Grant returns ptp Signaling packet granting the requested subscription
func (sc *SubscriptionClient) Grant() *ptp.Signaling {
return sc.grant
}