series_migrator.go (305 lines of code) (raw):

package main import ( "context" "encoding/binary" "fmt" "io" "io/ioutil" "os" "path/filepath" "strconv" "strings" "time" "golang.org/x/sync/errgroup" "github.com/prometheus/tsdb/labels" pb "gopkg.in/cheggaaa/pb.v1" // This is a copy of the v1.8.2 Prometheus local storage code, with the only // modification being that it allows checking whether all memory series have been persisted. v1 "gitlab.com/gitlab-org/prometheus-storage-migrator/v1storage" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage/local/chunk" "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/util/flock" "github.com/prometheus/tsdb" ) const ( // Version of the storage as it can be found in the version file. // Only version 1 is supported (there were no subsequent ones). storageVersion = 1 versionFileName = "VERSION" dirtyFileName = "DIRTY" seriesDirNameLen = 2 seriesFileSuffix = ".db" chunkHeaderLen = 17 chunkHeaderTypeOffset = 0 chunkHeaderFirstTimeOffset = 1 chunkHeaderLastTimeOffset = 9 chunkLenWithHeader = chunk.ChunkLen + chunkHeaderLen ) var ( fpLen = len(model.Fingerprint(0).String()) // Length of a fingerprint as string. seriesDirNameFmt = fmt.Sprintf("%%0%dx", seriesDirNameLen) ) // A seriesFileTracker keeps track of a series on disk and how far it has been migrated already. type seriesFileTracker struct { fp model.Fingerprint curChunk int labels labels.Labels v2Ref uint64 skipUnknownLabels bool } // migrate migrates a given range of time for the series. func (tr *seriesFileTracker) migrate(from, through model.Time, path string, app tsdb.Appender) error { fpStr := tr.fp.String() filename := filepath.Join(path, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) f, err := os.Open(filename) if err != nil { return fmt.Errorf("error opening chunk file: %s", err) } defer f.Close() buf := make([]byte, chunkLenWithHeader) // TODO: Share? for { if _, err := f.Seek(int64(tr.curChunk*chunkLenWithHeader), io.SeekStart); err != nil { return fmt.Errorf("error seeking to chunk index %d in file %s: %s", tr.curChunk, filename, err) } n, err := f.Read(buf) // TODO: read multiple chunks at once? if err == io.EOF { // TODO tr.curChunk = -1 return nil } if err != nil { return fmt.Errorf("error reading chunk %d for file %s: %s", tr.curChunk, filename, err) } if n != chunkLenWithHeader { return fmt.Errorf("read incomplete chunk (%d bytes) for file %s at chunk index %d", n, filename, tr.curChunk) } // TODO: skip chunks with first time > through in the external header. firstTime := model.Time(binary.LittleEndian.Uint64(buf[chunkHeaderFirstTimeOffset:])) if firstTime > through { return nil } c, err := chunk.NewForEncoding(chunk.Encoding(buf[chunkHeaderTypeOffset])) if err != nil { return fmt.Errorf("unable to create chunk from index %d in file %s: %s", tr.curChunk, filename, err) } c.UnmarshalFromBuf(buf[chunkHeaderLen:]) it := c.NewIterator() for it.Scan() { sp := it.Value() if sp.Timestamp.Before(from) { // Skip through this chunk until we hit a sample within our range. continue } if sp.Timestamp.After(through) { // We've found a sample past our range, everything afterwards we don't care about // anymore. return nil } if tr.v2Ref != 0 { tr.v2Ref, err = app.Add(tr.labels, int64(sp.Timestamp), float64(sp.Value)) if err != nil { if err == tsdb.ErrNotFound && tr.skipUnknownLabels { continue } return fmt.Errorf("v2Ref non-empty; unable to add to v2 storage: %s", err) } } else { err := app.AddFast(tr.v2Ref, int64(sp.Timestamp), float64(sp.Value)) switch errors.Cause(err) { case nil: // All good. case tsdb.ErrNotFound: tr.v2Ref, err = app.Add(tr.labels, int64(sp.Timestamp), float64(sp.Value)) if err != nil { if err == tsdb.ErrNotFound && tr.skipUnknownLabels { continue } return fmt.Errorf("unable to add to v2 storage: %s", err) } default: return fmt.Errorf("unable to fast-add to v2 storage: %s", err) } } } if it.Err() != nil { return fmt.Errorf("error iterating through chunk %d in file %s: %s", tr.curChunk, filename, err) } tr.curChunk++ } } // done returns true if all chunks for a series have been migrated. func (tr *seriesFileTracker) done() bool { return tr.curChunk == -1 } // storageMigrator migrates an entire v1 storage directory to the v2 format. type storageMigrator struct { v1Path string v2Storage tsdb.Appendable // How many series files to migrate at the same time within a given migration time step. parallelism int trackers []*seriesFileTracker fLock flock.Releaser skipUnknownLabels bool } // migrate does a complete storage migration from v1 to v2 format. func (m *storageMigrator) migrate(ctx context.Context, lookback time.Duration, step time.Duration) error { if err := m.drainHeads(); err != nil { return fmt.Errorf("error draining heads: %s", err) } if err := m.lockV1Storage(); err != nil { return fmt.Errorf("error locking v1 storage: %s", err) } defer m.unlockV1Storage() if err := m.initTrackers(ctx); err != nil { return fmt.Errorf("error initializing series file migrator: %s", err) } totalSteps := (lookback / step).Nanoseconds() bar := pb.StartNew(int(totalSteps)) for t := model.Now().Add(-lookback); !m.done(); t = t.Add(step) { bar.Increment() if err := m.migrateStep(ctx, t, t.Add(step)-1, m.v2Storage); err != nil { return fmt.Errorf("error migrating time step: %s", err) } } bar.FinishPrint("Migration complete.") return nil } // drainHeads persists all outstanding v1 chunks to their series files. func (m *storageMigrator) drainHeads() error { v1Storage := v1.NewMemorySeriesStorage(&v1.MemorySeriesStorageOptions{ // Create maximum persistence pressure. TargetHeapSize: 1, // Ensure that no old data is thrown away. PersistenceRetentionPeriod: 999999 * time.Hour, PersistenceStoragePath: m.v1Path, // Make sure that head chunks are closed and persisted as soon as possible. HeadChunkTimeout: 0, // We don't need intermediate checkpoints. CheckpointInterval: 999999 * time.Hour, CheckpointDirtySeriesLimit: 1e9, MinShrinkRatio: 0.5, SyncStrategy: v1.Adaptive, }) if err := v1Storage.Start(); err != nil { return fmt.Errorf("error starting v1 storage: %s", err) } for ; !v1Storage.HeadsDrained(); time.Sleep(time.Second) { // TODO: add timeout. } if err := v1Storage.Stop(); err != nil { return fmt.Errorf("error stopping v1 storage: %s", err) } return nil } func (m *storageMigrator) lockV1Storage() error { dirtyPath := filepath.Join(m.v1Path, dirtyFileName) versionPath := filepath.Join(m.v1Path, versionFileName) if versionData, err := ioutil.ReadFile(versionPath); err == nil { if persistedVersion, err := strconv.Atoi(strings.TrimSpace(string(versionData))); err != nil { return fmt.Errorf("cannot parse content of %q: %s", versionPath, versionData) } else if persistedVersion != storageVersion { return fmt.Errorf("found v1 storage storage version %d on disk, need version %d", persistedVersion, storageVersion) } } else if os.IsNotExist(err) { return fmt.Errorf("no version file found in v1 storage directory, cannot migrate storage with unknown version") } fLock, dirtyfileExisted, err := flock.New(dirtyPath) if err != nil { return fmt.Errorf("unable to lock %q: %s", dirtyPath, err) } if dirtyfileExisted { // This can only happen if the v1 directory becomes dirty for some reason // between draining the heads file and locking the directory manually here. return fmt.Errorf("v1 storage dir is dirty, please perform a crash recovery before retrying the migration") } m.fLock = fLock return nil } func (m *storageMigrator) unlockV1Storage() error { dirtyPath := filepath.Join(m.v1Path, dirtyFileName) var lastError error dirtyFileRemoveError := os.Remove(dirtyPath) if err := m.fLock.Release(); err != nil { lastError = err // TODO: Do we want to log this error if it's not the last? } if dirtyFileRemoveError != nil { // On Windows, removing the dirty file before unlocking is not // possible. So remove it here if it failed above. lastError = os.Remove(dirtyPath) } return lastError } // initTrackers initializes the migrator's series trackers from the v1 storage dir. // It scans the v1 dir for all series files and looks up the label set for each. func (m *storageMigrator) initTrackers(ctx context.Context) error { archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(m.v1Path) if err != nil { return fmt.Errorf("error opening fingerprint-metric index: %s", err) } for i := 0; i < 1<<(seriesDirNameLen*4); i++ { select { case <-ctx.Done(): return ctx.Err() default: } dirname := filepath.Join(m.v1Path, fmt.Sprintf(seriesDirNameFmt, i)) dir, err := os.Open(dirname) if os.IsNotExist(err) { continue } if err != nil { return err } for fis := []os.FileInfo{}; err != io.EOF; fis, err = dir.Readdir(1024) { if err != nil { dir.Close() return err } for _, fi := range fis { fp, err := model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]) if err != nil { return fmt.Errorf("error parsing file name %s: %s", filepath.Join(dirname, fi.Name()), err) } metric, ok, err := archivedFingerprintToMetrics.Lookup(fp) if err != nil { return fmt.Errorf("error looking up metric for fingerprint %v: %s", fp, err) } if !ok { return fmt.Errorf("unable to find metric for fingerprint %v", fp) } ls := make(labels.Labels, 0, len(metric)) for k, v := range metric { ls = append(ls, labels.Label{Name: string(k), Value: string(v)}) } m.trackers = append(m.trackers, &seriesFileTracker{ fp: fp, curChunk: 0, labels: labels.New(ls...), skipUnknownLabels: m.skipUnknownLabels, }) } } } return nil } // migrateStep migrates a range of time for all series. func (m *storageMigrator) migrateStep(ctx context.Context, from, through model.Time, appendable tsdb.Appendable) error { g, ctx := errgroup.WithContext(ctx) trackers := make(chan *seriesFileTracker) g.Go(func() error { defer close(trackers) for _, tr := range m.trackers { if tr.done() { continue } select { case trackers <- tr: case <-ctx.Done(): return ctx.Err() } } return nil }) for i := 0; i < m.parallelism; i++ { g.Go(func() error { for tr := range trackers { app := appendable.Appender() if err := tr.migrate(from, through, m.v1Path, app); err != nil { app.Rollback() // TODO: Log error? return err } if err := app.Commit(); err != nil { return fmt.Errorf("error committing samples: %s", err) } select { case <-ctx.Done(): return ctx.Err() default: } } return nil }) } return g.Wait() } // done returns true if there are no chunks to migrate anymore for any series. func (m *storageMigrator) done() bool { // TODO: Change this to remove series trackers completely when done? for _, tr := range m.trackers { if !tr.done() { return false } } return true }