series_migrator.go (305 lines of code) (raw):
package main
import (
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"golang.org/x/sync/errgroup"
"github.com/prometheus/tsdb/labels"
pb "gopkg.in/cheggaaa/pb.v1"
// This is a copy of the v1.8.2 Prometheus local storage code, with the only
// modification being that it allows checking whether all memory series have been persisted.
v1 "gitlab.com/gitlab-org/prometheus-storage-migrator/v1storage"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/util/flock"
"github.com/prometheus/tsdb"
)
const (
// Version of the storage as it can be found in the version file.
// Only version 1 is supported (there were no subsequent ones).
storageVersion = 1
versionFileName = "VERSION"
dirtyFileName = "DIRTY"
seriesDirNameLen = 2
seriesFileSuffix = ".db"
chunkHeaderLen = 17
chunkHeaderTypeOffset = 0
chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9
chunkLenWithHeader = chunk.ChunkLen + chunkHeaderLen
)
var (
fpLen = len(model.Fingerprint(0).String()) // Length of a fingerprint as string.
seriesDirNameFmt = fmt.Sprintf("%%0%dx", seriesDirNameLen)
)
// A seriesFileTracker keeps track of a series on disk and how far it has been migrated already.
type seriesFileTracker struct {
fp model.Fingerprint
curChunk int
labels labels.Labels
v2Ref uint64
skipUnknownLabels bool
}
// migrate migrates a given range of time for the series.
func (tr *seriesFileTracker) migrate(from, through model.Time, path string, app tsdb.Appender) error {
fpStr := tr.fp.String()
filename := filepath.Join(path, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
f, err := os.Open(filename)
if err != nil {
return fmt.Errorf("error opening chunk file: %s", err)
}
defer f.Close()
buf := make([]byte, chunkLenWithHeader) // TODO: Share?
for {
if _, err := f.Seek(int64(tr.curChunk*chunkLenWithHeader), io.SeekStart); err != nil {
return fmt.Errorf("error seeking to chunk index %d in file %s: %s", tr.curChunk, filename, err)
}
n, err := f.Read(buf) // TODO: read multiple chunks at once?
if err == io.EOF {
// TODO
tr.curChunk = -1
return nil
}
if err != nil {
return fmt.Errorf("error reading chunk %d for file %s: %s", tr.curChunk, filename, err)
}
if n != chunkLenWithHeader {
return fmt.Errorf("read incomplete chunk (%d bytes) for file %s at chunk index %d", n, filename, tr.curChunk)
}
// TODO: skip chunks with first time > through in the external header.
firstTime := model.Time(binary.LittleEndian.Uint64(buf[chunkHeaderFirstTimeOffset:]))
if firstTime > through {
return nil
}
c, err := chunk.NewForEncoding(chunk.Encoding(buf[chunkHeaderTypeOffset]))
if err != nil {
return fmt.Errorf("unable to create chunk from index %d in file %s: %s", tr.curChunk, filename, err)
}
c.UnmarshalFromBuf(buf[chunkHeaderLen:])
it := c.NewIterator()
for it.Scan() {
sp := it.Value()
if sp.Timestamp.Before(from) {
// Skip through this chunk until we hit a sample within our range.
continue
}
if sp.Timestamp.After(through) {
// We've found a sample past our range, everything afterwards we don't care about
// anymore.
return nil
}
if tr.v2Ref != 0 {
tr.v2Ref, err = app.Add(tr.labels, int64(sp.Timestamp), float64(sp.Value))
if err != nil {
if err == tsdb.ErrNotFound && tr.skipUnknownLabels {
continue
}
return fmt.Errorf("v2Ref non-empty; unable to add to v2 storage: %s", err)
}
} else {
err := app.AddFast(tr.v2Ref, int64(sp.Timestamp), float64(sp.Value))
switch errors.Cause(err) {
case nil:
// All good.
case tsdb.ErrNotFound:
tr.v2Ref, err = app.Add(tr.labels, int64(sp.Timestamp), float64(sp.Value))
if err != nil {
if err == tsdb.ErrNotFound && tr.skipUnknownLabels {
continue
}
return fmt.Errorf("unable to add to v2 storage: %s", err)
}
default:
return fmt.Errorf("unable to fast-add to v2 storage: %s", err)
}
}
}
if it.Err() != nil {
return fmt.Errorf("error iterating through chunk %d in file %s: %s", tr.curChunk, filename, err)
}
tr.curChunk++
}
}
// done returns true if all chunks for a series have been migrated.
func (tr *seriesFileTracker) done() bool {
return tr.curChunk == -1
}
// storageMigrator migrates an entire v1 storage directory to the v2 format.
type storageMigrator struct {
v1Path string
v2Storage tsdb.Appendable
// How many series files to migrate at the same time within a given migration time step.
parallelism int
trackers []*seriesFileTracker
fLock flock.Releaser
skipUnknownLabels bool
}
// migrate does a complete storage migration from v1 to v2 format.
func (m *storageMigrator) migrate(ctx context.Context, lookback time.Duration, step time.Duration) error {
if err := m.drainHeads(); err != nil {
return fmt.Errorf("error draining heads: %s", err)
}
if err := m.lockV1Storage(); err != nil {
return fmt.Errorf("error locking v1 storage: %s", err)
}
defer m.unlockV1Storage()
if err := m.initTrackers(ctx); err != nil {
return fmt.Errorf("error initializing series file migrator: %s", err)
}
totalSteps := (lookback / step).Nanoseconds()
bar := pb.StartNew(int(totalSteps))
for t := model.Now().Add(-lookback); !m.done(); t = t.Add(step) {
bar.Increment()
if err := m.migrateStep(ctx, t, t.Add(step)-1, m.v2Storage); err != nil {
return fmt.Errorf("error migrating time step: %s", err)
}
}
bar.FinishPrint("Migration complete.")
return nil
}
// drainHeads persists all outstanding v1 chunks to their series files.
func (m *storageMigrator) drainHeads() error {
v1Storage := v1.NewMemorySeriesStorage(&v1.MemorySeriesStorageOptions{
// Create maximum persistence pressure.
TargetHeapSize: 1,
// Ensure that no old data is thrown away.
PersistenceRetentionPeriod: 999999 * time.Hour,
PersistenceStoragePath: m.v1Path,
// Make sure that head chunks are closed and persisted as soon as possible.
HeadChunkTimeout: 0,
// We don't need intermediate checkpoints.
CheckpointInterval: 999999 * time.Hour,
CheckpointDirtySeriesLimit: 1e9,
MinShrinkRatio: 0.5,
SyncStrategy: v1.Adaptive,
})
if err := v1Storage.Start(); err != nil {
return fmt.Errorf("error starting v1 storage: %s", err)
}
for ; !v1Storage.HeadsDrained(); time.Sleep(time.Second) {
// TODO: add timeout.
}
if err := v1Storage.Stop(); err != nil {
return fmt.Errorf("error stopping v1 storage: %s", err)
}
return nil
}
func (m *storageMigrator) lockV1Storage() error {
dirtyPath := filepath.Join(m.v1Path, dirtyFileName)
versionPath := filepath.Join(m.v1Path, versionFileName)
if versionData, err := ioutil.ReadFile(versionPath); err == nil {
if persistedVersion, err := strconv.Atoi(strings.TrimSpace(string(versionData))); err != nil {
return fmt.Errorf("cannot parse content of %q: %s", versionPath, versionData)
} else if persistedVersion != storageVersion {
return fmt.Errorf("found v1 storage storage version %d on disk, need version %d", persistedVersion, storageVersion)
}
} else if os.IsNotExist(err) {
return fmt.Errorf("no version file found in v1 storage directory, cannot migrate storage with unknown version")
}
fLock, dirtyfileExisted, err := flock.New(dirtyPath)
if err != nil {
return fmt.Errorf("unable to lock %q: %s", dirtyPath, err)
}
if dirtyfileExisted {
// This can only happen if the v1 directory becomes dirty for some reason
// between draining the heads file and locking the directory manually here.
return fmt.Errorf("v1 storage dir is dirty, please perform a crash recovery before retrying the migration")
}
m.fLock = fLock
return nil
}
func (m *storageMigrator) unlockV1Storage() error {
dirtyPath := filepath.Join(m.v1Path, dirtyFileName)
var lastError error
dirtyFileRemoveError := os.Remove(dirtyPath)
if err := m.fLock.Release(); err != nil {
lastError = err
// TODO: Do we want to log this error if it's not the last?
}
if dirtyFileRemoveError != nil {
// On Windows, removing the dirty file before unlocking is not
// possible. So remove it here if it failed above.
lastError = os.Remove(dirtyPath)
}
return lastError
}
// initTrackers initializes the migrator's series trackers from the v1 storage dir.
// It scans the v1 dir for all series files and looks up the label set for each.
func (m *storageMigrator) initTrackers(ctx context.Context) error {
archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(m.v1Path)
if err != nil {
return fmt.Errorf("error opening fingerprint-metric index: %s", err)
}
for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
dirname := filepath.Join(m.v1Path, fmt.Sprintf(seriesDirNameFmt, i))
dir, err := os.Open(dirname)
if os.IsNotExist(err) {
continue
}
if err != nil {
return err
}
for fis := []os.FileInfo{}; err != io.EOF; fis, err = dir.Readdir(1024) {
if err != nil {
dir.Close()
return err
}
for _, fi := range fis {
fp, err := model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen])
if err != nil {
return fmt.Errorf("error parsing file name %s: %s", filepath.Join(dirname, fi.Name()), err)
}
metric, ok, err := archivedFingerprintToMetrics.Lookup(fp)
if err != nil {
return fmt.Errorf("error looking up metric for fingerprint %v: %s", fp, err)
}
if !ok {
return fmt.Errorf("unable to find metric for fingerprint %v", fp)
}
ls := make(labels.Labels, 0, len(metric))
for k, v := range metric {
ls = append(ls, labels.Label{Name: string(k), Value: string(v)})
}
m.trackers = append(m.trackers, &seriesFileTracker{
fp: fp,
curChunk: 0,
labels: labels.New(ls...),
skipUnknownLabels: m.skipUnknownLabels,
})
}
}
}
return nil
}
// migrateStep migrates a range of time for all series.
func (m *storageMigrator) migrateStep(ctx context.Context, from, through model.Time, appendable tsdb.Appendable) error {
g, ctx := errgroup.WithContext(ctx)
trackers := make(chan *seriesFileTracker)
g.Go(func() error {
defer close(trackers)
for _, tr := range m.trackers {
if tr.done() {
continue
}
select {
case trackers <- tr:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
for i := 0; i < m.parallelism; i++ {
g.Go(func() error {
for tr := range trackers {
app := appendable.Appender()
if err := tr.migrate(from, through, m.v1Path, app); err != nil {
app.Rollback() // TODO: Log error?
return err
}
if err := app.Commit(); err != nil {
return fmt.Errorf("error committing samples: %s", err)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
})
}
return g.Wait()
}
// done returns true if there are no chunks to migrate anymore for any series.
func (m *storageMigrator) done() bool {
// TODO: Change this to remove series trackers completely when done?
for _, tr := range m.trackers {
if !tr.done() {
return false
}
}
return true
}