persistence/queue.go (73 lines of code) (raw):

// Copyright 2017 Google Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package persistence import ( "encoding/json" ) // Queue implements a simple persistent queue. Each Queue function is threadsafe and atomic within // the scope of the Persistence instance that created it. However, there is no guarantee that // multiple Queue function calls operate atomically. E.g., if a caller calls Peek and then later // Dequeue, there's a chance that some other caller will have modified the queue in the meantime. type Queue interface { // Peek loads the object at the head of this Queue into obj. If successful, nil is returned and // obj will be populated. ErrNotFound is returned if the queue is empty or does not exist. Other // I/O errors may be returned in the event of I/O failures. Peek(obj interface{}) error // Dequeue removes the front of this Queue. If successful, nil is returned. ErrNotFound is // returned if the queue is empty or does not exist. Other I/O errors may be returned in the event // of I/O failures. If obj is non-nil, it will contain removed value upon success. Dequeue(obj interface{}) error // Enqueue stores obj at the back of this Queue. Returns nil if the object was stored, or an error // if something failed. Enqueue(obj interface{}) error } // Type valueQueue is a Queue that stores its state within a single value. Queue state is stored as // a json array. type valueQueue struct { value value } func (vq *valueQueue) Peek(obj interface{}) error { var queue []json.RawMessage // Grab the value's associated persistence read lock and load the queue vq.value.mutex().RLock() err := vq.value.load(&queue) vq.value.mutex().RUnlock() if err != nil { return err } // If the queue exists but is somehow empty, we return ErrNotFound if len(queue) == 0 { return ErrNotFound } // Unmarshal the front of the queue and store it in obj. if err := json.Unmarshal(queue[0], obj); err != nil { return err } return nil } func (vq *valueQueue) Dequeue(obj interface{}) error { var queue []json.RawMessage // Grab the value's associated persistence lock vq.value.mutex().Lock() defer vq.value.mutex().Unlock() // First, load the existing queue if err := vq.value.load(&queue); err != nil { return err } // If the queue exists but is somehow empty, we return ErrNotFound if len(queue) == 0 { return ErrNotFound } // If the caller passed a non-nil obj, store the value currently at the front of the queue. if obj != nil { if err := json.Unmarshal(queue[0], obj); err != nil { return err } } // Remove the front. If the new queue still has entries, store it. Else, remove the backing value. newq := queue[1:] if len(newq) > 0 { if err := vq.value.store(queue[1:]); err != nil { return err } } else { if err := vq.value.remove(); err != nil { return err } } return nil } func (vq *valueQueue) Enqueue(obj interface{}) error { var queue []json.RawMessage var err error var bytes []byte // First marshal the given object into json text. if bytes, err = json.Marshal(obj); err != nil { return err } // Grab the value's associated persistence lock vq.value.mutex().Lock() defer vq.value.mutex().Unlock() // Load the existing queue in preparation for updating if err = vq.value.load(&queue); err != nil && err != ErrNotFound { return err } // Append the new value to the end of the queue, and store the result. queue = append(queue, bytes) if err := vq.value.store(queue); err != nil { return err } return nil }