in series_migrator.go [62:143]
func (tr *seriesFileTracker) migrate(from, through model.Time, path string, app tsdb.Appender) error {
fpStr := tr.fp.String()
filename := filepath.Join(path, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
f, err := os.Open(filename)
if err != nil {
return fmt.Errorf("error opening chunk file: %s", err)
}
defer f.Close()
buf := make([]byte, chunkLenWithHeader) // TODO: Share?
for {
if _, err := f.Seek(int64(tr.curChunk*chunkLenWithHeader), io.SeekStart); err != nil {
return fmt.Errorf("error seeking to chunk index %d in file %s: %s", tr.curChunk, filename, err)
}
n, err := f.Read(buf) // TODO: read multiple chunks at once?
if err == io.EOF {
// TODO
tr.curChunk = -1
return nil
}
if err != nil {
return fmt.Errorf("error reading chunk %d for file %s: %s", tr.curChunk, filename, err)
}
if n != chunkLenWithHeader {
return fmt.Errorf("read incomplete chunk (%d bytes) for file %s at chunk index %d", n, filename, tr.curChunk)
}
// TODO: skip chunks with first time > through in the external header.
firstTime := model.Time(binary.LittleEndian.Uint64(buf[chunkHeaderFirstTimeOffset:]))
if firstTime > through {
return nil
}
c, err := chunk.NewForEncoding(chunk.Encoding(buf[chunkHeaderTypeOffset]))
if err != nil {
return fmt.Errorf("unable to create chunk from index %d in file %s: %s", tr.curChunk, filename, err)
}
c.UnmarshalFromBuf(buf[chunkHeaderLen:])
it := c.NewIterator()
for it.Scan() {
sp := it.Value()
if sp.Timestamp.Before(from) {
// Skip through this chunk until we hit a sample within our range.
continue
}
if sp.Timestamp.After(through) {
// We've found a sample past our range, everything afterwards we don't care about
// anymore.
return nil
}
if tr.v2Ref != 0 {
tr.v2Ref, err = app.Add(tr.labels, int64(sp.Timestamp), float64(sp.Value))
if err != nil {
if err == tsdb.ErrNotFound && tr.skipUnknownLabels {
continue
}
return fmt.Errorf("v2Ref non-empty; unable to add to v2 storage: %s", err)
}
} else {
err := app.AddFast(tr.v2Ref, int64(sp.Timestamp), float64(sp.Value))
switch errors.Cause(err) {
case nil:
// All good.
case tsdb.ErrNotFound:
tr.v2Ref, err = app.Add(tr.labels, int64(sp.Timestamp), float64(sp.Value))
if err != nil {
if err == tsdb.ErrNotFound && tr.skipUnknownLabels {
continue
}
return fmt.Errorf("unable to add to v2 storage: %s", err)
}
default:
return fmt.Errorf("unable to fast-add to v2 storage: %s", err)
}
}
}
if it.Err() != nil {
return fmt.Errorf("error iterating through chunk %d in file %s: %s", tr.curChunk, filename, err)
}
tr.curChunk++
}
}