go/protocol/hlc/hlc.go (192 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package hlc import ( "fmt" "strconv" "strings" "sync" "time" "github.com/Azure/iot-operations-sdks/go/internal/options" "github.com/Azure/iot-operations-sdks/go/internal/wallclock" "github.com/Azure/iot-operations-sdks/go/protocol/errors" "github.com/google/uuid" ) type ( // HybridLogicalClock provides a combination of physical and logical clocks // used to track timestamps across a distributed system. HybridLogicalClock struct { timestamp time.Time counter uint64 nodeID string opt *HybridLogicalClockOptions } // Global provides a shared instance of an HLC. Only one of these should // typically be created per application. Global struct { hlc HybridLogicalClock mu sync.Mutex opt HybridLogicalClockOptions } // HybridLogicalClockOption represents a single HLC option. HybridLogicalClockOption interface { hlc(*HybridLogicalClockOptions) } // HybridLogicalClockOptions are the resolved HLC options. HybridLogicalClockOptions struct { MaxClockDrift time.Duration } // WithMaxClockDrift specifies how long HLCs are allowed to drift from the // wall clock before they are considered no longer valid. WithMaxClockDrift time.Duration ) // DefaultMaxClockDrift is the default maximum HLC clock drift if none is // otherwise specified (one minute). const DefaultMaxClockDrift = time.Minute // New creates a new shared instance of an HLC. Only one of these should // typically be created per application. func New(opt ...HybridLogicalClockOption) *Global { g := &Global{} g.opt.Apply(opt) if g.opt.MaxClockDrift == 0 { g.opt.MaxClockDrift = DefaultMaxClockDrift } g.hlc = HybridLogicalClock{ timestamp: now(), nodeID: uuid.Must(uuid.NewV7()).String(), opt: &g.opt, } return g } // Get syncs the shared HLC instance to the current time and returns it. func (g *Global) Get() (HybridLogicalClock, error) { g.mu.Lock() defer g.mu.Unlock() hlc, err := g.hlc.Update(HybridLogicalClock{}) if err != nil { return HybridLogicalClock{}, err } g.hlc = hlc return g.hlc, nil } // Set syncs the shared HLC instance to the given HLC. func (g *Global) Set(hlc HybridLogicalClock) error { g.mu.Lock() defer g.mu.Unlock() hlc, err := g.hlc.Update(hlc) if err != nil { return err } g.hlc = hlc return nil } // UTC returns the physical clock component of the HTC in UTC. func (hlc HybridLogicalClock) UTC() time.Time { // This is always set to UTC, so no need to normalize. return hlc.timestamp } // Update an HLC based on another one and return the new value. func (hlc HybridLogicalClock) Update( other HybridLogicalClock, ) (HybridLogicalClock, error) { // Don't update from the same node. if other.nodeID == hlc.nodeID { return hlc, nil } wall := now() // Note: The order of checks ensures that a zeroed other HLC behaves as if // it were the same as the wall clock. updated := HybridLogicalClock{ timestamp: wall, nodeID: hlc.nodeID, opt: hlc.opt, } switch { case wall.After(hlc.timestamp) && wall.After(other.timestamp): // Since we're setting the HLC to the wall clock, we don't need to // validate it further. return updated, nil case hlc.timestamp.Equal(other.timestamp): updated.timestamp = hlc.timestamp updated.counter = max(hlc.counter, other.counter) + 1 case hlc.timestamp.After(other.timestamp): updated.timestamp = hlc.timestamp updated.counter = hlc.counter + 1 default: updated.timestamp = other.timestamp updated.counter = other.counter + 1 } switch { // Since the unsigned counter was incremented by 1, a value of 0 here // indicates integer overflow. case updated.counter == 0: return HybridLogicalClock{}, &errors.Client{ Message: "integer overflow in HLC counter", Kind: errors.StateInvalid{PropertyName: "Counter"}, } case updated.timestamp.Sub(wall) > updated.opt.MaxClockDrift: return HybridLogicalClock{}, &errors.Client{ Message: "clock drift exceeds maximum", Kind: errors.StateInvalid{PropertyName: "MaxClockDrift"}, } default: return updated, nil } } // Compare this HLC value with another one. func (hlc HybridLogicalClock) Compare(other HybridLogicalClock) int { if hlc.timestamp.Equal(other.timestamp) { switch { case hlc.counter > other.counter: return 1 case hlc.counter < other.counter: return -1 default: return strings.Compare(hlc.nodeID, other.nodeID) } } return hlc.timestamp.Compare(other.timestamp) } // IsZero returns whether this HLC matches its zero value. func (hlc HybridLogicalClock) IsZero() bool { // Only check the timestamp, since if it's a zero time the other values are // not meaningful. return hlc.timestamp.IsZero() } // String retrieves a serialized form of the HLC. func (hlc HybridLogicalClock) String() string { return fmt.Sprintf( "%015d:%05d:%s", hlc.timestamp.UnixMilli(), hlc.counter, hlc.nodeID, ) } // Get the current time in UTC with ms precision. func now() time.Time { return wallclock.Instance.Now().UTC().Truncate(time.Millisecond) } // Parse the HLC from a string. func (g *Global) Parse(name, value string) (HybridLogicalClock, error) { parts := strings.Split(value, ":") if len(parts) != 3 { return HybridLogicalClock{}, &errors.Client{ Message: "HLC must contain three segments separated by ':'", Kind: errors.HeaderInvalid{ HeaderName: name, HeaderValue: value, }, } } timestamp, err := strconv.ParseInt(parts[0], 10, 64) if err != nil { return HybridLogicalClock{}, &errors.Client{ Message: "first HLC segment is not a valid integer", Kind: errors.HeaderInvalid{ HeaderName: name, HeaderValue: value, }, Nested: err, } } count, err := strconv.ParseUint(parts[1], 10, 64) if err != nil { return HybridLogicalClock{}, &errors.Client{ Message: "second HLC segment is not a valid integer", Kind: errors.HeaderInvalid{ HeaderName: name, HeaderValue: value, }, Nested: err, } } return HybridLogicalClock{ timestamp: time.UnixMilli(timestamp).UTC(), counter: count, nodeID: parts[2], opt: &g.opt, }, nil } // Apply resolves the provided list of options. func (o *HybridLogicalClockOptions) Apply( opts []HybridLogicalClockOption, rest ...HybridLogicalClockOption, ) { for opt := range options.Apply[HybridLogicalClockOption](opts, rest...) { opt.hlc(o) } } func (o *HybridLogicalClockOptions) hlc(opt *HybridLogicalClockOptions) { if o != nil { *opt = *o } } func (o WithMaxClockDrift) hlc(opt *HybridLogicalClockOptions) { opt.MaxClockDrift = time.Duration(o) }