go/services/leasedlock/lease.go (191 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package leasedlock import ( "context" "errors" "time" "github.com/Azure/iot-operations-sdks/go/protocol/hlc" "github.com/Azure/iot-operations-sdks/go/services/statestore" ) type ( // Bytes represents generic byte data. Bytes = statestore.Bytes // Lease provides a distributed lease based on an underlying state store. Lease[K, V Bytes] struct { Name K SessionID string client *statestore.Client[K, V] result result done context.CancelFunc mu much } // Change represents an observed change in the lease holder. Change struct { Held bool Holder string } // Result of the previous lease attempt, with its own lock for concurrency. result struct { token hlc.HybridLogicalClock error error mu much } ) var ( // ErrNoLease is used in absence of other errors to indicate that the lease // has not been acquired. ErrNoLease = errors.New("lease not acquired") // ErrRenewing indicates that renew was specified on a lease that is already // renewing. ErrRenewing = errors.New("lease already renewing") ) // NewLease creates a new distributed lease from an underlying state store // client and a lease name. func NewLease[K, V Bytes]( client *statestore.Client[K, V], name K, opt ...Option, ) *Lease[K, V] { var opts Options opts.Apply(opt) return &Lease[K, V]{ Name: name, SessionID: opts.SessionID, client: client, result: result{ error: ErrNoLease, mu: make(chan struct{}, 1), }, mu: make(chan struct{}, 1), } } func (l *Lease[K, V]) id(opts *Options) V { switch { case opts.SessionID != "": return V(l.client.ID() + ":" + opts.SessionID) case l.SessionID != "": return V(l.client.ID() + ":" + l.SessionID) default: return V(l.client.ID()) } } // Token returns the current fencing token value or the error that caused the // lease to fail. Note that this function will block if the lease is currently // renewing and can be cancelled using its context. func (l *Lease[K, V]) Token( ctx context.Context, ) (hlc.HybridLogicalClock, error) { if err := l.result.mu.Lock(ctx); err != nil { return hlc.HybridLogicalClock{}, err } defer l.result.mu.Unlock() return l.result.token, l.result.error } // Acquire performs a single attempt to acquire the lease, returning whether it // was successful. If the lease was already held by another client, this will // return false with no error. func (l *Lease[K, V]) Acquire( ctx context.Context, duration time.Duration, opt ...Option, ) (bool, error) { var opts Options opts.Apply(opt) if err := l.mu.Lock(ctx); err != nil { return false, err } defer l.mu.Unlock() // Error on duplicate renews so we don't start up conflicting goroutines. if opts.Renew > 0 && l.done != nil { return false, ErrRenewing } ok, err := l.try(ctx, duration, &opts) if !ok || err != nil { return ok, err } // If specified, renew until an attempt fails or the lease is released. if opts.Renew > 0 { var ctx context.Context ctx, l.done = context.WithCancel(context.Background()) go func() { for { select { case <-time.After(opts.Renew): ok, _ := l.try(ctx, duration, &opts) if !ok { return } case <-ctx.Done(): } } }() } return true, nil } func (l *Lease[K, V]) try( ctx context.Context, duration time.Duration, opts *Options, ) (bool, error) { if err := l.result.mu.Lock(ctx); err != nil { return false, err } defer l.result.mu.Unlock() res, err := l.client.Set( ctx, l.Name, l.id(opts), opts.set(), statestore.WithCondition(statestore.NotExistsOrEqual), statestore.WithExpiry(duration), ) if err != nil { l.result.token, l.result.error = hlc.HybridLogicalClock{}, err return false, err } if !res.Value { l.result.token, l.result.error = hlc.HybridLogicalClock{}, ErrNoLease return false, nil } l.result.token, l.result.error = res.Version, nil return true, nil } // Release the lease. func (l *Lease[K, V]) Release( ctx context.Context, opt ...Option, ) error { var opts Options opts.Apply(opt) // Stop any renew. if err := l.mu.Lock(ctx); err != nil { return err } defer l.mu.Unlock() if l.done != nil { l.done() l.done = nil } // Reset the token. if err := l.result.mu.Lock(ctx); err != nil { return err } defer l.result.mu.Unlock() l.result.token, l.result.error = hlc.HybridLogicalClock{}, ErrNoLease // Release the lease. _, err := l.client.VDel(ctx, l.Name, l.id(&opts), opts.vdel()) return err } // Holder gets the current holder of the lease and an indicator of whether the // lease is currently held. func (l *Lease[K, V]) Holder( ctx context.Context, opt ...Option, ) (string, bool, error) { var opts Options opts.Apply(opt) res, err := l.client.Get(ctx, l.Name, opts.get()) if err != nil { return "", false, err } return string(res.Value), !res.Version.IsZero(), nil } // ObserveStart initializes observation of lease holder changes. It should be // paired with a call to ObserveStop. func (l *Lease[K, V]) ObserveStart(ctx context.Context, opt ...Option) error { var opts Options opts.Apply(opt) return l.client.KeyNotify(ctx, l.Name, opts.keynotify()) } // ObserveStop terminates observation of lease holder changes. It should only be // called once per successfull call to ObserveStart (but may be retried in case // of failure). func (l *Lease[K, V]) ObserveStop(ctx context.Context, opt ...Option) error { var opts Options opts.Apply(opt) return l.client.KeyNotifyStop(ctx, l.Name, opts.keynotify()) } // Observe requests a lease holder change notification channel for this lease. // It returns the channel and a function to remove and close that channel. Note // that ObserveStart must be called to actually start observing (though changes // may be received on this channel if ObserveStart had already been called // previously). func (l *Lease[K, V]) Observe() (<-chan Change, func()) { obs := make(chan Change) kn, done := l.client.Notify(l.Name) // Spin up a simple translation of NOTIFY to Change. Calling done() will // close the kn channel, terminating this loop. go func() { defer close(obs) for n := range kn { obs <- Change{ Held: n.Operation != "DELETE", Holder: string(n.Value), } } }() return obs, done }