go/services/leasedlock/lock.go (117 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package leasedlock import ( "context" "time" "github.com/Azure/iot-operations-sdks/go/services/statestore" ) type ( // Lock provides a distributed mutex-like lock based on an underlying state // store. Lock[K, V Bytes] struct{ *Lease[K, V] } // Edit provides a callback to edit a value under protection of a lock. // Given the current value when the lock is acquired and whether that value // was present, it should return the updated value and whether the new value // should be set (true) or deleted (false). Edit[V Bytes] = func(context.Context, V, bool) (V, bool, error) ) // NewLock creates a new distributed lock from an underlying state store client // and a lock name. func NewLock[K, V Bytes]( client *statestore.Client[K, V], name K, opt ...Option, ) Lock[K, V] { return Lock[K, V]{NewLease(client, name, opt...)} } // Lock the lock object, blocking until locked or the request fails. Note that // cancelling the context passed to this method will prevent the underlying // notification from stopping; it is recommended to use WithTimeout instead. func (l Lock[K, V]) Lock( ctx context.Context, duration time.Duration, opt ...Option, ) error { var opts Options opts.Apply(opt) // Register notification first so we don't miss a delete. if err := l.client.KeyNotify(ctx, l.Name, opts.keynotify()); err != nil { return err } //nolint:errcheck // TODO: Is there anything useful to do if this fails? defer l.client.KeyNotifyStop(ctx, l.Name, opts.keynotify()) kn, done := l.client.Notify(l.Name) defer done() // Respect any requested timeout while waiting for the delete. if opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, opts.Timeout) defer cancel() } for { ok, err := l.Acquire(ctx, duration, opt...) if err != nil { return err } if ok { return nil } if err := waitForDelete(ctx, kn); err != nil { return err } } } func waitForDelete[K, V Bytes]( ctx context.Context, kn <-chan statestore.Notify[K, V], ) error { for { select { case n := <-kn: if n.Operation == "DELETE" { return nil } case <-ctx.Done(): return context.Cause(ctx) } } } // Unlock the lock object. func (l Lock[K, V]) Unlock( ctx context.Context, opt ...Option, ) error { return l.Release(ctx, opt...) } // Edit a key under the protection of this lock. func (l Lock[K, V]) Edit( ctx context.Context, key K, duration time.Duration, edit Edit[V], opt ...Option, ) error { var opts Options opts.Apply(opt) var done bool var err error for err == nil && !done { done, err = l.edit(ctx, key, duration, edit, &opts) } return err } func (l Lock[K, V]) edit( ctx context.Context, key K, duration time.Duration, edit Edit[V], opts *Options, ) (bool, error) { err := l.Lock(ctx, duration, opts) if err != nil { return false, err } //nolint:errcheck // TODO: Is there anything useful to do if this fails? defer l.Unlock(ctx, opts) ft, err := l.Token(ctx) if err != nil { return false, err } wft := statestore.WithFencingToken(ft) get, err := l.client.Get(ctx, key, opts.get()) if err != nil { return false, err } upd, set, err := edit(ctx, get.Value, !get.Version.IsZero()) if err != nil { return false, err } if !set { res, err := l.client.Del(ctx, key, opts.del(), wft) return err == nil && res.Value > 0, err } res, err := l.client.Set(ctx, key, upd, opts.set(), wft) return err == nil && res.Value, err }