cli/azd/internal/telemetry/storage.go (255 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package telemetry
import (
"context"
"errors"
"fmt"
"io/fs"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/azure/azure-dev/cli/azd/pkg/osutil"
"github.com/benbjohnson/clock"
)
// A time.Format layout suitable for use in file names.
const fsTimeLayout = "20060102T150405"
// Max time before temp files are cleaned up
const tempFileTtl = time.Duration(5) * time.Minute
type Queue interface {
Enqueue(message []byte) error
EnqueueWithDelay(message []byte, delayDuration time.Duration, retryCount int) error
Peek() (*StoredItem, error)
Remove(item *StoredItem) error
}
// StorageQueue is a FIFO-based queue backed by disk storage, with items stored as individual files.
// The current implementation allows for multiple producers, single consumer.
//
// Items can be queued by Enqueue or EnqueueWithDelay.
// QueueWithDelay allows for producers to queue items that should not be picked up
// by consumers until after the specified duration has passed. This is useful for retry delay scheduling.
//
// Items can be read by Peek, which will read the next available item.
// Once the item is processed, consumers are responsible for calling Remove to remove the item from the queue.
type StorageQueue struct {
folder string
itemFileExtension string
itemFileMaxTimeKept time.Duration
// Standard time library clock, unless mocked in tests
clock clock.Clock
}
type StoredItem struct {
// Number of retries attempted
retryCount int
// Message in the item
message []byte
// File name of the stored item
fileName string
}
func (itm *StoredItem) RetryCount() int {
return itm.retryCount
}
func (itm *StoredItem) Message() []byte {
return itm.message
}
type itemEntry struct {
name string
readyTime time.Time
fileModTime time.Time
retryCount int
}
// Creates the storage-based queue.
func NewStorageQueue(
folder string, itemFileExtension string, itemFileMaxTimeKept time.Duration) (*StorageQueue, error) {
if err := os.MkdirAll(folder, osutil.PermissionDirectory); err != nil {
return nil, fmt.Errorf("failed to create storage queue folder: %w", err)
}
if !strings.HasPrefix(itemFileExtension, ".") {
itemFileExtension = "." + itemFileExtension
}
storage := StorageQueue{
folder: folder,
itemFileExtension: itemFileExtension,
itemFileMaxTimeKept: itemFileMaxTimeKept,
clock: clock.New(),
}
return &storage, nil
}
// Queues a message.
func (stg *StorageQueue) Enqueue(message []byte) error {
return stg.save(time.Duration(0), 0, message)
}
// Queues a message with delay.
func (stg *StorageQueue) EnqueueWithDelay(message []byte, delayDuration time.Duration, retryCount int) error {
return stg.save(delayDuration, retryCount, message)
}
func (stg *StorageQueue) save(delayDuration time.Duration, retryCount int, message []byte) error {
file, err := os.CreateTemp(stg.folder, "*_itm.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
tempFileName := file.Name()
err = os.WriteFile(tempFileName, message, osutil.PermissionFile)
if err != nil {
_ = removeIfExists(tempFileName)
return fmt.Errorf("failed to write file: %w", err)
}
file.Close()
generatedFileName := filepath.Base(tempFileName)
randomSuffix := generatedFileName[:strings.LastIndex(generatedFileName, "_")]
readyTime := stg.clock.Now().Add(delayDuration)
fileName := formatFileName(readyTime, retryCount, randomSuffix, stg.itemFileExtension)
err = os.Rename(tempFileName, filepath.Join(stg.folder, fileName))
if err != nil {
_ = removeIfExists(tempFileName)
return fmt.Errorf("failed to rename file: %w", err)
}
return nil
}
// Gets the next available item for processing.
// Returns nil if no items exist.
// Returns error if an error occurs while reading storage.
func (stg *StorageQueue) Peek() (*StoredItem, error) {
items, err := stg.getAllItemsUnordered()
if err != nil {
return nil, fmt.Errorf("failed to get stored files: %w", err)
}
leastRecentTime := time.Time{}
latestIndex := -1
now := stg.clock.Now()
for i, item := range items {
timeSinceReady := now.Sub(item.readyTime)
if timeSinceReady >= 0 && timeSinceReady < stg.itemFileMaxTimeKept {
if latestIndex == -1 || item.fileModTime.Before(leastRecentTime) {
leastRecentTime = item.fileModTime
latestIndex = i
}
}
}
if latestIndex == -1 {
return nil, nil
}
item := items[latestIndex]
fileName := filepath.Join(stg.folder, item.name)
message, err := os.ReadFile(fileName)
if err != nil {
return nil, fmt.Errorf("failed to read latest stored item: %w", err)
}
return &StoredItem{
fileName: fileName,
retryCount: item.retryCount,
message: message,
}, nil
}
// Removes the stored item from queue.
// Does not return an error if the item is already removed.
func (stg *StorageQueue) Remove(item *StoredItem) error {
err := os.Remove(item.fileName)
if errors.Is(err, os.ErrNotExist) {
return nil
}
if err != nil {
return fmt.Errorf("failed to remove stored item: %w", err)
}
return nil
}
func removeIfExists(filename string) error {
err := os.Remove(filename)
if errors.Is(err, os.ErrNotExist) {
return nil
}
return err
}
// Scans the storage directory for any obsoleted items or temp files.
func (stg *StorageQueue) Cleanup(ctx context.Context, done chan (struct{})) {
defer func() { done <- struct{}{} }()
files, err := os.ReadDir(stg.folder)
if err != nil {
return
}
for _, file := range files {
select {
case <-ctx.Done():
return
default:
if file.IsDir() {
continue
}
stg.checkFileForCleanup(file)
}
}
}
func (stg *StorageQueue) checkFileForCleanup(file fs.DirEntry) {
name := file.Name()
if strings.HasSuffix(name, ".tmp") {
stg.checkTempFileForCleanup(file)
} else if strings.HasSuffix(name, stg.itemFileExtension) {
stg.checkItemFileForCleanup(file)
}
}
func (stg *StorageQueue) checkTempFileForCleanup(file fs.DirEntry) {
info, err := file.Info()
if err != nil {
log.Printf("failed to retrieve old tmp file info for %s: %s", file.Name(), err)
return
}
if stg.clock.Since(info.ModTime()) >= tempFileTtl {
stg.cleanupItem(file, "old tmp")
}
}
func (stg *StorageQueue) checkItemFileForCleanup(file fs.DirEntry) {
item, ok := parseFileName(file.Name())
if !ok {
stg.cleanupItem(file, "unparsable item")
return
}
if stg.clock.Since(item.readyTime) >= stg.itemFileMaxTimeKept {
stg.cleanupItem(file, "old item")
}
}
func (stg *StorageQueue) cleanupItem(file fs.DirEntry, itemType string) {
err := removeIfExists(filepath.Join(stg.folder, file.Name()))
if err != nil {
log.Printf("failed to remove %s file: %s", itemType, err)
}
}
func (stg *StorageQueue) getAllItemsUnordered() ([]itemEntry, error) {
dirEntries, err := readDirUnordered(stg.folder)
if err != nil {
return nil, fmt.Errorf("error reading folder: %w", err)
}
items := []itemEntry{}
for _, entry := range dirEntries {
if !entry.IsDir() && strings.HasSuffix(entry.Name(), stg.itemFileExtension) {
item, ok := readItemEntry(entry)
if ok {
items = append(items, *item)
}
}
}
return items, nil
}
// readDirUnordered is os.ReadDir except it returns entries unordered instead of by name order
func readDirUnordered(name string) ([]os.DirEntry, error) {
f, err := os.Open(name)
if err != nil {
return nil, err
}
defer f.Close()
dirs, err := f.ReadDir(-1)
return dirs, err
}
func formatFileName(readyTime time.Time, retryCount int, uniqueSuffix string, extension string) string {
return fmt.Sprintf("%s_%d_%s%s",
readyTime.UTC().Format(fsTimeLayout),
retryCount,
uniqueSuffix,
extension)
}
func readItemEntry(dirEntry os.DirEntry) (*itemEntry, bool) {
item, ok := parseFileName(dirEntry.Name())
if !ok {
return nil, false
}
info, err := dirEntry.Info()
if err != nil {
return nil, false
}
item.fileModTime = info.ModTime()
return item, true
}
func parseFileName(name string) (*itemEntry, bool) {
sections := strings.Split(name, "_")
if len(sections) < 2 {
return nil, false
}
timestamp, err := time.Parse(fsTimeLayout, sections[0])
if err != nil {
return nil, false
}
retryCount, err := strconv.Atoi(sections[1])
if err != nil {
return nil, false
}
return &itemEntry{
name: name,
readyTime: timestamp,
retryCount: retryCount,
}, true
}