ptp/simpleclient/client.go (423 lines of code) (raw):

/* Copyright (c) Facebook, Inc. and its affiliates. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package simpleclient import ( "context" "fmt" "net" "time" "github.com/fatih/color" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "golang.org/x/sys/unix" ptp "github.com/facebook/time/ptp/protocol" "github.com/facebook/time/timestamp" ) // re-export timestamping const ( // HWTIMESTAMP is a hardware timestamp HWTIMESTAMP = timestamp.HWTIMESTAMP // SWTIMESTAMP is a software timestamp SWTIMESTAMP = timestamp.SWTIMESTAMP ) type state int const ( stateInit = iota stateInProgress stateDone ) var stateToString = map[state]string{ stateInit: "INIT", stateDone: "DONE", stateInProgress: "IN_PROGRESS", } func (s state) String() string { return stateToString[s] } // inPacket is input packet data + receive timestamp type inPacket struct { data []byte ts time.Time } // UDPConn describes what functionality we expect from UDP connection type UDPConn interface { ReadFromUDP(b []byte) (int, *net.UDPAddr, error) WriteTo(b []byte, addr net.Addr) (int, error) Close() error } // UDPConnWithTS describes what functionality we expect from UDP connection that allows us to read TX timestamps type UDPConnWithTS interface { UDPConn WriteToWithTS(b []byte, addr net.Addr) (int, time.Time, error) } type udpConnTS struct { *net.UDPConn } func (c *udpConnTS) WriteToWithTS(b []byte, addr net.Addr) (int, time.Time, error) { n, err := c.WriteTo(b, addr) if err != nil { return 0, time.Time{}, err } // get FD of the connection. Can be optimized by doing this when connection is created connFd, err := timestamp.ConnFd(c.UDPConn) if err != nil { return 0, time.Time{}, fmt.Errorf("failed to get conn fd udp connection: %v", err) } hwts, _, err := timestamp.ReadTXtimestamp(connFd) if err != nil { return 0, time.Time{}, fmt.Errorf("failed to get timestamp of last packet: %v", err) } return n, hwts, nil } // Config specifies Client run options type Config struct { // address of a server to talk to Address string // interface name that we'll use to send/receive packets Iface string // timeout of whole session Timeout time.Duration // for how long we'll request unicast transmission from server Duration time.Duration // what type of typestamping to use Timestamping string } // Client is a very simplified PTPv2 unicast client. // Whenever it has all the data to calculate offset/delay/etc // it will call provided callback function with `MeasurementResult`. type Client struct { cfg *Config // state management // packet sequence counters genSequence uint16 eventSequence uint16 // state enum state state // chan for received packets regardless of port inChan chan *inPacket // listening connection on port 320 genConn UDPConn // listening connection on port 319 eventConn UDPConnWithTS // addresses of server we'll talk to, for both 319 and 320 port genAddr *net.UDPAddr eventAddr *net.UDPAddr // our clockID derived from MAC address clockID ptp.ClockIdentity // where we store timestamps m *measurements // what to do when we receive latest measurement callback func(*MeasurementResult) } // New initializes new PTPv2 unicast client func New(cfg *Config, callback func(*MeasurementResult)) *Client { c := &Client{ inChan: make(chan *inPacket, 10), m: newMeasurements(), cfg: cfg, callback: callback, } return c } func (c *Client) sendGeneralMsg(p ptp.Packet) (uint16, error) { seq := c.genSequence p.SetSequence(c.genSequence) b, err := ptp.Bytes(p) if err != nil { return 0, err } // send packet _, err = c.genConn.WriteTo(b, c.genAddr) if err != nil { return 0, err } log.Debugf("sent packet via port %d to %v", ptp.PortGeneral, c.genAddr) c.genSequence++ return seq, nil } func (c *Client) sendEventMsg(p ptp.Packet) (uint16, time.Time, error) { seq := c.eventSequence p.SetSequence(c.eventSequence) b, err := ptp.Bytes(p) if err != nil { return 0, time.Time{}, err } // send packet _, hwts, err := c.eventConn.WriteToWithTS(b, c.eventAddr) if err != nil { return 0, time.Time{}, err } c.eventSequence++ log.Debugf("sent packet via port %d to %v", ptp.PortEvent, c.eventAddr) return seq, hwts, nil } func (c *Client) setup(ctx context.Context, eg *errgroup.Group) error { iface, err := net.InterfaceByName(c.cfg.Iface) if err != nil { return err } cid, err := ptp.NewClockIdentity(iface.HardwareAddr) if err != nil { return err } log.Infof("using ClockIdentity %s, talking to %v using Two-Step Unicast PTPv2 protocol", cid, c.cfg.Address) c.clockID = cid // addresses // where to send to genAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(c.cfg.Address, fmt.Sprintf("%d", ptp.PortGeneral))) if err != nil { return err } eventAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(c.cfg.Address, fmt.Sprintf("%d", ptp.PortEvent))) if err != nil { return err } // bind to general port genConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("::"), Port: ptp.PortGeneral}) if err != nil { return err } c.genConn = genConn c.genAddr = genAddr // bind to event port eventConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("::"), Port: ptp.PortEvent}) if err != nil { return err } // get FD of the connection. Can be optimized by doing this when connection is created connFd, err := timestamp.ConnFd(eventConn) if err != nil { return err } // we need to enable HW or SW timestamps on event port switch c.cfg.Timestamping { case "": // auto-detection if err := timestamp.EnableHWTimestamps(connFd, c.cfg.Iface); err != nil { if err := timestamp.EnableSWTimestamps(connFd); err != nil { return fmt.Errorf("failed to enable timestamps on port %d: %v", ptp.PortEvent, err) } log.Warningf("Failed to enable hardware timestamps on port %d, falling back to software timestamps", ptp.PortEvent) } else { log.Infof("Using hardware timestamps") } case HWTIMESTAMP: if err := timestamp.EnableHWTimestamps(connFd, c.cfg.Iface); err != nil { return fmt.Errorf("failed to enable hardware timestamps on port %d: %v", ptp.PortEvent, err) } case SWTIMESTAMP: if err := timestamp.EnableSWTimestamps(connFd); err != nil { return fmt.Errorf("failed to enable software timestamps on port %d: %v", ptp.PortEvent, err) } default: return fmt.Errorf("unknown type of typestamping: %q", c.cfg.Timestamping) } // set it to blocking mode, otherwise recvmsg will just return with nothing most of the time if err := unix.SetNonblock(connFd, false); err != nil { return fmt.Errorf("failed to set event socket to blocking: %w", err) } c.eventConn = &udpConnTS{eventConn} c.eventAddr = eventAddr // get packets from general port eg.Go(func() error { // it's done in non-blocking way, so if context is cancelled we exit correctly doneChan := make(chan error, 1) go func() { for { response := make([]uint8, 1024) n, addr, err := genConn.ReadFromUDP(response) if err != nil { doneChan <- err return } log.Debugf("got packet on port 320, n = %v, addr = %v", n, addr) if !addr.IP.Equal(genAddr.IP) { log.Warningf("ignoring packets from server %v", addr) } c.inChan <- &inPacket{data: response[:n]} } }() select { case <-ctx.Done(): log.Debugf("cancelled general port receiver") return ctx.Err() case err = <-doneChan: return err } }) // get packets from event port eg.Go(func() error { // it's done in non-blocking way, so if context is cancelled we exit correctly doneChan := make(chan error, 1) go func() { for { response, addr, rxtx, err := timestamp.ReadPacketWithRXTimestamp(connFd) if err != nil { doneChan <- err return } log.Debugf("got packet on port 319, addr = %v", addr) if !timestamp.SockaddrToIP(addr).Equal(eventAddr.IP) { log.Warningf("ignoring packets from server %v", addr) } c.inChan <- &inPacket{data: response, ts: rxtx} } }() select { case <-ctx.Done(): log.Debugf("cancelled event port receiver") return ctx.Err() case err = <-doneChan: return err } }) return nil } // handleGrantUnicast handles SIGNALLING packet that grants parts of unicast transmission func (c *Client) handleGrantUnicast(tlv *ptp.GrantUnicastTransmissionTLV) error { msgType := tlv.MsgTypeAndReserved.MsgType() c.logReceive(ptp.MessageSignaling, "unicast grant for %s", msgType) switch msgType { case ptp.MessageAnnounce: // we received response, no need to request more grants for Announce c.setState(stateInProgress) if tlv.DurationField == 0 { return fmt.Errorf("server denied us grant for %s", msgType) } // ask for sync messages seq, err := c.sendGeneralMsg(reqUnicast(c.clockID, c.cfg.Duration, ptp.MessageSync)) if err != nil { return err } c.logSent(ptp.MessageSignaling, "for %s, seq=%d", ptp.MessageSync, seq) case ptp.MessageSync: if tlv.DurationField == 0 { return fmt.Errorf("server denied us grant for %s", msgType) } // ask for delay_resp messages seq, err := c.sendGeneralMsg(reqUnicast(c.clockID, c.cfg.Duration, ptp.MessageDelayResp)) if err != nil { return err } c.logSent(ptp.MessageSignaling, "for %s, seq=%d", ptp.MessageDelayResp, seq) case ptp.MessageDelayResp: if tlv.DurationField == 0 { return fmt.Errorf("server denied us grant for %s", msgType) } log.Infof("unicast handshake complete") default: return fmt.Errorf("got unexpected grant for %s", msgType) } return nil } // handleCancelUnicast handles SIGNALLING packet that marks end of unicast transmission func (c *Client) handleCancelUnicast(tlv *ptp.CancelUnicastTransmissionTLV) error { c.logReceive(ptp.MessageSignaling, "unicast transmission cancelled, dying") seq, err := c.sendGeneralMsg(reqAckCancelUnicast(c.clockID, tlv.MsgTypeAndFlags.MsgType())) if err != nil { return err } c.logSent(ptp.MessageSignaling, "ACK CANCEL for %s, seq=%d", tlv.MsgTypeAndFlags.MsgType(), seq) // real client should have answered to all CANCEL messages, but we won't c.setState(stateDone) return nil } // handleAnnounce handles ANNOUNCE packet and records UTC offset from it's data func (c *Client) handleAnnounce(b *ptp.Announce) error { c.logReceive(ptp.MessageAnnounce, "seq=%d, gmIdentity=%s, gmTimeSource=%s, stepsRemoved=%d", b.SequenceID, b.GrandmasterIdentity, b.TimeSource, b.StepsRemoved) c.m.currentUTCoffset = time.Duration(b.CurrentUTCOffset) * time.Second return nil } // handleSync handles SYNC packet and adds send timestamp to measurements func (c *Client) handleSync(b *ptp.SyncDelayReq, ts time.Time) error { c.logReceive(ptp.MessageSync, "seq=%d, our ReceiveTimestamp=%v", b.SequenceID, ts) c.m.addSync(b.SequenceID, ts) return nil } // handleDelay handles DELAY packet and adds ReceiveTimestamp to measurements func (c *Client) handleDelay(b *ptp.DelayResp) error { c.logReceive(ptp.MessageDelayResp, "seq=%d, server ReceiveTimestamp=%v", b.SequenceID, b.ReceiveTimestamp.Time()) // store data in measurements c.m.addDelayResp(b.SequenceID, b.ReceiveTimestamp.Time()) // do whatever needs to be done with current measurements res, err := c.m.latest() if err != nil { log.Warningf("failed to get measurements: %v", err) return nil } c.callback(res) return nil } // handleFollowUp handles FOLLOW_UP packet and sends DELAY_REQ packet func (c *Client) handleFollowUp(b *ptp.FollowUp) error { c.logReceive(ptp.MessageFollowUp, "seq=%d, server PreciseOriginTimestamp=%v", b.SequenceID, b.PreciseOriginTimestamp.Time()) c.m.addFollowUp(b.SequenceID, b.PreciseOriginTimestamp.Time()) // ask for delay seq, hwts, err := c.sendEventMsg(reqDelay(c.clockID)) if err != nil { return err } c.m.addDelayReq(seq, hwts) c.logSent(ptp.MessageDelayReq, "seq=%d, our TransmissionTimestamp=%v", seq, hwts) return nil } // dispatch handler based on msg type func (c *Client) handleMsg(msg *inPacket) error { msgType, err := ptp.ProbeMsgType(msg.data) if err != nil { return err } switch msgType { case ptp.MessageSignaling: signaling := &ptp.Signaling{} if err := ptp.FromBytes(msg.data, signaling); err != nil { return fmt.Errorf("reading signaling msg: %w", err) } for _, tlv := range signaling.TLVs { switch v := tlv.(type) { case *ptp.GrantUnicastTransmissionTLV: if err := c.handleGrantUnicast(v); err != nil { return err } case *ptp.CancelUnicastTransmissionTLV: if err := c.handleCancelUnicast(v); err != nil { return err } default: return fmt.Errorf("got unsupported TLV type %s(%d)", tlv.Type(), tlv.Type()) } } return nil case ptp.MessageAnnounce: announce := &ptp.Announce{} if err := ptp.FromBytes(msg.data, announce); err != nil { return fmt.Errorf("reading announce msg: %w", err) } return c.handleAnnounce(announce) case ptp.MessageSync: b := &ptp.SyncDelayReq{} if err := ptp.FromBytes(msg.data, b); err != nil { return fmt.Errorf("reading sync msg: %w", err) } return c.handleSync(b, msg.ts) case ptp.MessageDelayResp: b := &ptp.DelayResp{} if err := ptp.FromBytes(msg.data, b); err != nil { return fmt.Errorf("reading delay_resp msg: %w", err) } return c.handleDelay(b) case ptp.MessageFollowUp: b := &ptp.FollowUp{} if err := ptp.FromBytes(msg.data, b); err != nil { return fmt.Errorf("reading follow_up msg: %w", err) } return c.handleFollowUp(b) default: c.logReceive(msgType, "unsupported, ignoring") return nil } } // dedicated function just for logging state changes func (c *Client) setState(s state) { if c.state != s { log.Debugf("Changing state to %s", s) c.state = s } } // couple of helpers to log nice lines about happening communication func (c *Client) logSent(t ptp.MessageType, msg string, v ...interface{}) { log.Infof(color.GreenString("client -> %s (%s)", t, fmt.Sprintf(msg, v...))) } func (c *Client) logReceive(t ptp.MessageType, msg string, v ...interface{}) { log.Infof(color.BlueString("server -> %s (%s)", t, fmt.Sprintf(msg, v...))) } // Run is the main function, it makes client talk to server provided in config func (c *Client) Run() error { return c.runInternal(false) } // runInternal allows us to skip setup for unittests func (c *Client) runInternal(skipSetup bool) error { ctx, cancel := context.WithTimeout(context.Background(), c.cfg.Timeout) defer cancel() eg, ctx := errgroup.WithContext(ctx) if !skipSetup { if err := c.setup(ctx, eg); err != nil { return err } } eg.Go(func() error { for { select { case <-ctx.Done(): log.Debugf("cancelled main loop") return ctx.Err() case msg := <-c.inChan: if err := c.handleMsg(msg); err != nil { return err } default: switch c.state { case stateInit: seq, err := c.sendGeneralMsg(reqUnicast(c.clockID, c.cfg.Duration, ptp.MessageAnnounce)) if err != nil { return err } c.logSent(ptp.MessageSignaling, "for %s, seq=%d", ptp.MessageAnnounce, seq) time.Sleep(time.Second) case stateDone: cancel() return nil } } } }) return eg.Wait() } // Close connections func (c *Client) Close() { if c.eventConn != nil { c.eventConn.Close() } if c.genConn != nil { c.genConn.Close() } }