lib/dsstore/lock.go (88 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package dsstore import ( "context" "time" "cloud.google.com/go/datastore" /* copybara-comment */ "google.golang.org/grpc/codes" /* copybara-comment */ "google.golang.org/grpc/status" /* copybara-comment */ "github.com/pborman/uuid" /* copybara-comment */ ) // Lock provides a distributed lock mechanism using datastore. // Datastore doesn't provide a transaction commit time, the times used are local // process times. Be aware of clock skew and add a small buffer to avoid // problems. // // Example: // l := dsstore.NewLock(c, "lockname") // d := 10*time.Second // // for { // // Respect the calling context. // select { // case <-ctx.Done(): // return ctx.Err() // default: // } // // Try to acquire the lock. // if err := l.Acquire(ctx, time.Now().Add(d)); err == nil { // // Lock acuqired. // break // } // // Lock could not be acquired. Sleep for a while and retry. // time.Sleep(time.Second) // } // // // [optional] Try to release the lock as soon as work is done. // // Note that explicit releasing of the lock is best-effort and // // cannot be gauranteed because of failures (e.g. crashes), // // if a process does not or fails to explicitly release the the lock, // // the lock will be automatically released after lock duration expires and // // will not cause a deadlock. // defer l.Release(ctx) // // // We only hold the lock for 10s, so we have a bit less than 10s remaining. // ctx, cancel := context.WithTimeout(ctx, 9*time.Second) // defer cancel() // // // Critical section that requires mutual exclusion. // ... // ... // Lock is the data for a lock. type Lock struct { // client for accessing Datastore. client *datastore.Client // name is the name of the datastore lock object. name string // holder is the uuid for the holder of the lock. holder string } // NewLock creates a new lock object. func NewLock(client *datastore.Client, name string) *Lock { return &Lock{ client: client, name: name, holder: uuid.New(), } } // lockKind is the datastore kind for locks. const lockKind = "lock" // lockData is the datastore lock entity. type lockData struct { // holder is the uuid for the holder of the lock. Holder string // End is the time until which the lock is held. End time.Time } // Acquire attempts to acquire the lock until the specified time. // Warning: The times used are local times, be careful with cross process clock skew, // stop assuming that you hold the lock slightly before you func (l *Lock) Acquire(ctx context.Context, end time.Time) error { key := datastore.NameKey(lockKind, l.name, nil) f := func(tx *datastore.Transaction) error { now := time.Now() lock := &lockData{} err := tx.Get(key, lock) if err != nil && err != datastore.ErrNoSuchEntity { return status.Errorf(codes.Internal, "reading %q failed: %v", l.name, err) } if lock.End.After(now) { return status.Errorf(codes.FailedPrecondition, "lock %q is reserved till %v > now = %v", l.name, lock.End, now) } lock = &lockData{Holder: l.holder, End: end} if _, err := tx.Put(key, lock); err != nil { return status.Errorf(codes.Internal, "writing lock %q failed: %v", l.name, err) } return nil } if _, err := l.client.RunInTransaction(ctx, f, datastore.MaxAttempts(1)); err != nil { return err } return nil } // Release attempts to release the holding of the named lock. func (l *Lock) Release(ctx context.Context) error { key := datastore.NameKey(lockKind, l.name, nil) f := func(tx *datastore.Transaction) error { now := time.Now() lock := &lockData{} err := tx.Get(key, lock) if err != nil && err != datastore.ErrNoSuchEntity { return status.Errorf(codes.Internal, "reading lock %q failed: %v", l.name, err) } if lock.Holder != l.holder { return status.Errorf(codes.FailedPrecondition, "lock %q is hold by another process", l.name) } if lock.End.Before(now) { return status.Errorf(codes.FailedPrecondition, "lock %q hold ended at %v", l.name, lock.End) } lock = &lockData{Holder: l.holder, End: now} if _, err := tx.Put(key, lock); err != nil { return status.Errorf(codes.Internal, "writing lock %q failed: %v", l.name, err) } return nil } if _, err := l.client.RunInTransaction(ctx, f, datastore.MaxAttempts(1)); err != nil { return err } return nil } // Reset clears the information about the lock from datastore. func (l *Lock) Reset(ctx context.Context) error { key := datastore.NameKey(lockKind, l.name, nil) f := func(tx *datastore.Transaction) error { if err := tx.Delete(key); err != nil { return status.Errorf(codes.Internal, "deleting lock %q failed: %v", l.name, err) } return nil } if _, err := l.client.RunInTransaction(ctx, f, datastore.MaxAttempts(1)); err != nil { return err } return nil }