libbeat/statestore/backend/memlog/diskstore.go (488 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package memlog
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"github.com/elastic/beats/v7/libbeat/common/cleanup"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
// diskstore manages the on-disk state of the memlog store.
type diskstore struct {
log *logp.Logger
// store configuration
checkpointPred CheckpointPredicate
fileMode os.FileMode
bufferSize int
// on disk file tracking information
home string // home path of the store
logFilePath string // current log file
oldDataFiles []dataFileInfo // unused data files that can be removed
activeDataFile dataFileInfo // most recent data file that needs to be kept on disk
// nextTxID is the sequential counter that tracks
// all updates to the store. The nextTxID is added to operation being logged
// used as name for the data files.
nextTxID uint64
// log file access. The log file is updated using an in memory write buffer.
logFile *os.File
logBuf *bufio.Writer
// internal state and metrics
logFileSize uint64
logEntries uint
logInvalid bool
logNeedsTruncate bool
}
// dataFileInfo is used to track and sort on disk data files.
// We should have only one data file on disk, but in case delete operations
// have failed or not finished dataFileInfo is used to detect the ordering.
//
// dataFileInfo can be ordered on txid. When sorting isTxIDLessEqual should be
// used, to get the correct ordering even in the case of integer overflows.
// For sorting a slice of dataFileInfo use sortDataFileInfos.
type dataFileInfo struct {
path string
txid uint64
}
// storeEntry is used to write entries to the checkpoint file only.
type storeEntry struct {
Key string `struct:"_key"`
Fields mapstr.M `struct:",inline"`
}
// storeMeta is read from the meta file.
type storeMeta struct {
Version string `struct:"version"`
}
// logAction is prepended to each operation logged to the update file.
// It contains the update ID, a sequential counter to track correctness,
// and the action name.
type logAction struct {
Op string `json:"op"`
ID uint64 `json:"id"`
}
const (
logFileName = "log.json"
metaFileName = "meta.json"
activeDataFileName = "active.dat"
activeDataTmpFileName = "active.dat.new"
checkpointTmpFileName = "checkpoint.new"
storeVersion = "1"
keyField = "_key"
)
// newDiskStore initializes the disk store structure only. The store must have
// been opened already. It tries to open the update log file for append
// operations. If opening the update log file fails, it is marked as
// 'corrupted', triggering a checkpoint operation on the first update to the store.
func newDiskStore(
log *logp.Logger,
home string,
dataFiles []dataFileInfo,
txid uint64,
mode os.FileMode,
entries uint,
logInvalid bool,
bufferSize uint,
checkpointPred CheckpointPredicate,
) (*diskstore, error) {
var active dataFileInfo
if L := len(dataFiles); L > 0 {
active = dataFiles[L-1]
dataFiles = dataFiles[:L-1]
}
s := &diskstore{
log: log.With("path", home),
home: home,
logFilePath: filepath.Join(home, logFileName),
oldDataFiles: dataFiles,
activeDataFile: active,
nextTxID: txid + 1,
fileMode: mode,
bufferSize: int(bufferSize),
logFile: nil,
logBuf: nil,
logEntries: entries,
logInvalid: logInvalid,
logNeedsTruncate: false, // only truncate on next checkpoint
checkpointPred: checkpointPred,
}
// delete temporary files from an older instances that was interrupted
// during a checkpoint process.
// Note: we do not delete old data files yet, in case we need them for debugging,
// or to manually restore some older state after disk outages.
if err := os.Remove(filepath.Join(home, checkpointTmpFileName)); err != nil && !os.IsNotExist(err) {
return nil, err
}
if err := os.Remove(filepath.Join(home, activeDataTmpFileName)); err != nil && !os.IsNotExist(err) {
return nil, err
}
_ = s.tryOpenLog()
return s, nil
}
// tryOpenLog access the update log. The log file is truncated if a checkpoint operation has been
// executed last.
// The log file is marked as invalid if opening it failed. This will trigger a checkpoint operation
// and another call to tryOpenLog in the future.
func (s *diskstore) tryOpenLog() error {
flags := os.O_RDWR | os.O_CREATE
if s.logNeedsTruncate {
flags |= os.O_TRUNC
}
f, err := os.OpenFile(s.logFilePath, flags, s.fileMode)
if err != nil {
s.log.Errorf("Failed to open file %v: %v", s.logFilePath, err)
return err
}
ok := false
defer cleanup.IfNot(&ok, func() {
f.Close()
})
_, err = f.Seek(0, io.SeekEnd)
if err != nil {
return err
}
if s.logNeedsTruncate {
s.logEntries = 0 // reset counter if file was truncated on Open
s.logFileSize = 0
} else {
info, err := f.Stat()
if err != nil {
return err
}
s.logFileSize = uint64(info.Size())
}
ok = true
s.logNeedsTruncate = false
s.logFile = f
s.logBuf = bufio.NewWriterSize(s.logFile, s.bufferSize)
return nil
}
// mustCheckpoint returns true if the store is required to execute a checkpoint
// operation, either by predicate or by some internal state detecting a problem
// with the log file.
func (s *diskstore) mustCheckpoint() bool {
return s.logInvalid || s.checkpointPred(s.logFileSize)
}
func (s *diskstore) Close() error {
if s.logFile != nil {
// always sync log file on ordinary shutdown.
err := s.logBuf.Flush()
if err == nil {
err = s.logFile.Sync()
}
s.logFile.Close()
s.logFile = nil
s.logBuf = nil
return err
}
return nil
}
// log operation adds another entry to the update log file.
// The log file is marked as invalid if the write fails. This will trigger a
// checkpoint operation in the future.
func (s *diskstore) LogOperation(op op) error {
if s.logInvalid {
return errLogInvalid
}
if s.logFile == nil {
// We continue in case we have errors accessing the log file, but mark the
// store as invalid. This will force a full state checkpoint.
// The call to tryOpenLog prints some error log, we only use the error as
// indicator to invalidate the disk store, so we can try to recover by
// checkpointing.
if err := s.tryOpenLog(); err != nil {
s.logInvalid = true
return err
}
}
writer := s.logBuf
counting := &countWriter{w: writer}
defer func() {
s.logFileSize += counting.n
}()
ok := false
defer cleanup.IfNot(&ok, func() {
s.logInvalid = true
})
enc := newJSONEncoder(counting)
if err := enc.Encode(logAction{Op: op.name(), ID: s.nextTxID}); err != nil {
return err
}
_ = writer.WriteByte('\n')
if err := enc.Encode(op); err != nil {
return err
}
_ = writer.WriteByte('\n')
if err := writer.Flush(); err != nil {
return err
}
ok = true
s.logEntries++
s.nextTxID++
return nil
}
// WriteCheckpoint serializes all state into a json file. The file contains an
// array with all states known to the memory storage.
// WriteCheckpoint first serializes all state to a temporary file, and finally
// moves the temporary data file into the correct location. No files
// are overwritten or replaced. Instead the change sequence number is used for
// the filename, and older data files will be deleted after success.
//
// The active marker file is overwritten after all updates did succeed. The
// marker file contains the filename of the current valid data-file.
// NOTE: due to limitation on some Operating system or file systems, the active
// marker is not a symlink, but an actual file.
func (s *diskstore) WriteCheckpoint(state map[string]entry) error {
tmpPath, err := s.checkpointTmpFile(filepath.Join(s.home, checkpointTmpFileName), state)
if err != nil {
return err
}
// silently try to delete the temporary checkpoint file on error.
// Deletion of tmpPath will fail if the rename operation did succeed.
defer os.Remove(tmpPath)
// The checkpoint is assigned the next available transaction id. This
// guarantees that all existing log entries are 'older' then the checkpoint
// file and subsequenent operations. The first operation after a successful
// checkpoint will be (fileTxID + 1).
fileTxID := s.nextTxID
fileName := fmt.Sprintf("%v.json", fileTxID)
checkpointPath := filepath.Join(s.home, fileName)
if err := os.Rename(tmpPath, checkpointPath); err != nil {
return err
}
trySyncPath(s.home)
// clear transaction log once finished
s.checkpointClearLog()
// finish current on-disk transaction by increasing the txid
s.nextTxID++
if s.activeDataFile.path != "" {
s.oldDataFiles = append(s.oldDataFiles, s.activeDataFile)
}
s.activeDataFile = dataFileInfo{
path: checkpointPath,
txid: fileTxID,
}
// delete old transaction files
_ = updateActiveMarker(s.log, s.home, s.activeDataFile.path)
s.removeOldDataFiles()
trySyncPath(s.home)
return nil
}
func (s *diskstore) checkpointTmpFile(tempfile string, states map[string]entry) (string, error) {
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, s.fileMode)
if err != nil {
return "", err
}
ok := false
defer cleanup.IfNot(&ok, func() {
f.Close()
})
writer := bufio.NewWriterSize(f, s.bufferSize)
enc := newJSONEncoder(writer)
if _, err = writer.Write([]byte{'['}); err != nil {
return "", err
}
first := true
for key, entry := range states {
prefix := []byte(",\n")
if first {
prefix = prefix[1:]
first = false
}
if _, err = writer.Write(prefix); err != nil {
return "", err
}
err = enc.Encode(storeEntry{
Key: key,
Fields: entry.value,
})
if err != nil {
return "", err
}
}
if _, err = writer.Write([]byte("\n]")); err != nil {
return "", err
}
if err = writer.Flush(); err != nil {
return "", err
}
if err = f.Sync(); err != nil {
return "", err
}
ok = true
if err = f.Close(); err != nil {
return "", err
}
return tempfile, nil
}
func (s *diskstore) checkpointClearLog() {
if s.logFile == nil {
s.logNeedsTruncate = true
return
}
err := s.logFile.Truncate(0)
if err == nil {
_, err = s.logFile.Seek(0, io.SeekStart)
s.logInvalid = false
}
if err != nil {
s.logFile.Close()
s.logFile = nil
s.logBuf = nil
s.logNeedsTruncate = true
s.logInvalid = true
}
s.logEntries = 0
s.logFileSize = 0
}
// updateActiveMarker overwrites the active.dat file in the home directory with
// the path of the most recent checkpoint file.
// The active file will be written to `<homePath>`/active.dat.
func updateActiveMarker(log *logp.Logger, homePath, checkpointFilePath string) error {
activeLink := filepath.Join(homePath, activeDataFileName)
tmpLink := filepath.Join(homePath, activeDataTmpFileName)
log = log.With("temporary", tmpLink, "data_file", checkpointFilePath, "link_file", activeLink)
if checkpointFilePath == "" {
if err := os.Remove(activeLink); err != nil { // try, remove active.dat if present.
log.Errorf("Failed to remove old pointer file: %v", err)
}
return nil
}
// Atomically try to update the pointer file to the most recent data file.
// We 'simulate' the atomic update by create the temporary active.dat.new file,
// which we rename to active.dat. If active.dat.tmp exists we remove it.
if err := os.Remove(tmpLink); err != nil && !os.IsNotExist(err) {
log.Errorf("Failed to remove old temporary active.dat.tmp file: %v", err)
return err
}
if err := os.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil {
log.Errorf("Failed to write temporary pointer file: %v", err)
return err
}
if err := os.Rename(tmpLink, activeLink); err != nil {
log.Errorf("Failed to replace link file: %v", err)
return err
}
trySyncPath(homePath)
return nil
}
// removeOldDataFiles sorts the data files by their update sequence number and
// finally deletes all but the newest file from the storage directory.
func (s *diskstore) removeOldDataFiles() {
for i := range s.oldDataFiles {
path := s.oldDataFiles[i].path
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
s.log.With("file", path).Errorf("Failed to delete old data file: %v", err)
s.oldDataFiles = s.oldDataFiles[i:]
return
}
}
s.oldDataFiles = nil
}
// listDataFiles returns a sorted list of data files with txid per file.
// The list is sorted by txid, in ascending order (taking integer overflows
// into account).
func listDataFiles(home string) ([]dataFileInfo, error) {
files, err := filepath.Glob(filepath.Join(home, "*.json"))
if err != nil {
return nil, err
}
var infos []dataFileInfo
for i := range files {
info, err := os.Lstat(files[i])
if err != nil {
return nil, err
}
if !info.Mode().IsRegular() {
continue
}
name := filepath.Base(files[i])
name = name[:len(name)-5] // remove '.json' extension
id, err := strconv.ParseUint(name, 10, 64)
if err == nil {
infos = append(infos, dataFileInfo{
path: files[i],
txid: id,
})
}
}
// empty or most recent snapshot was complete (old data file has been deleted)
if len(infos) <= 1 {
return infos, nil
}
// sort files by transaction ID
sortDataFileInfos(infos)
return infos, nil
}
// sortDataFileInfos sorts the slice by the files txid.
func sortDataFileInfos(infos []dataFileInfo) {
sort.Slice(infos, func(i, j int) bool {
return isTxIDLessEqual(infos[i].txid, infos[j].txid)
})
}
// loadDataFile create a new hashtable with all key/value pairs found.
func loadDataFile(path string, tbl map[string]entry) error {
if path == "" {
return nil
}
err := readDataFile(path, func(key string, state mapstr.M) {
tbl[key] = entry{value: state}
})
return err
}
var ErrCorruptStore = errors.New("corrupted data file")
func readDataFile(path string, fn func(string, mapstr.M)) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
var states []map[string]interface{}
dec := json.NewDecoder(f)
if err := dec.Decode(&states); err != nil {
return fmt.Errorf("%w: %w", ErrCorruptStore, err)
}
for _, state := range states {
keyRaw := state[keyField]
key, ok := keyRaw.(string)
if !ok {
continue
}
delete(state, keyField)
fn(key, mapstr.M(state))
}
return nil
}
// loadLogFile applies all recorded transaction to an already initialized
// memStore.
// The txid is the transaction ID of the last known valid data file.
// Transactions older then txid will be ignored.
// loadLogFile returns the last committed txid in logTxid and the total number
// of operations in logCount.
func loadLogFile(
store *memstore,
txid uint64,
home string,
) (logTxid uint64, entries uint, err error) {
err = readLogFile(home, func(rawOp op, id uint64) error {
// ignore old entries in case the log file truncation was not executed between a beat restart.
if isTxIDLessEqual(id, txid) {
return nil
}
if id != txid+1 {
return errTxIDInvalid
}
txid = id
switch op := rawOp.(type) {
case *opSet:
entries++
store.Set(op.K, op.V)
case *opRemove:
entries++
store.Remove(op.K)
}
return nil
})
if err != nil {
return txid, entries, err
}
return txid, entries, err
}
// readLogFile iterates all operations found in the transaction log.
func readLogFile(home string, fn func(op, uint64) error) error {
path := filepath.Join(home, logFileName)
f, err := os.Open(path)
if os.IsNotExist(err) {
return nil
}
defer f.Close()
dec := json.NewDecoder(f)
for dec.More() {
var act logAction
if err := dec.Decode(&act); err != nil {
return err
}
var op op
switch act.Op {
case opValSet:
op = &opSet{}
case opValRemove:
op = &opRemove{}
}
if err := dec.Decode(op); err != nil {
return err
}
if err := fn(op, act.ID); err != nil {
return err
}
}
return nil
}
func checkMeta(meta storeMeta) error {
if meta.Version != storeVersion {
return fmt.Errorf("store version %v not supported", meta.Version)
}
return nil
}
func writeMetaFile(home string, mode os.FileMode) error {
path := filepath.Join(home, metaFileName)
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, mode)
if err != nil {
return err
}
ok := false
defer cleanup.IfNot(&ok, func() {
f.Close()
})
enc := newJSONEncoder(f)
err = enc.Encode(storeMeta{
Version: storeVersion,
})
if err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
ok = true
if err := f.Close(); err != nil {
return err
}
trySyncPath(home)
return nil
}
func readMetaFile(home string) (storeMeta, error) {
var meta storeMeta
path := filepath.Join(home, metaFileName)
f, err := os.Open(path)
if err != nil {
return meta, err
}
defer f.Close()
dec := json.NewDecoder(f)
if err := dec.Decode(&meta); err != nil {
return meta, fmt.Errorf("can not read store meta file: %w", err)
}
return meta, nil
}
// isTxIDLessEqual compares two IDs by checking that their distance is < 2^63.
// It always returns true if
// - a == b
// - a < b (mod 2^63)
// - b > a after an integer rollover that is still within the distance of <2^63-1
func isTxIDLessEqual(a, b uint64) bool {
return int64(a-b) <= 0
}