func()

in series_migrator.go [62:143]


func (tr *seriesFileTracker) migrate(from, through model.Time, path string, app tsdb.Appender) error {
	fpStr := tr.fp.String()
	filename := filepath.Join(path, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
	f, err := os.Open(filename)
	if err != nil {
		return fmt.Errorf("error opening chunk file: %s", err)
	}
	defer f.Close()

	buf := make([]byte, chunkLenWithHeader) // TODO: Share?
	for {
		if _, err := f.Seek(int64(tr.curChunk*chunkLenWithHeader), io.SeekStart); err != nil {
			return fmt.Errorf("error seeking to chunk index %d in file %s: %s", tr.curChunk, filename, err)
		}
		n, err := f.Read(buf) // TODO: read multiple chunks at once?
		if err == io.EOF {
			// TODO
			tr.curChunk = -1
			return nil
		}
		if err != nil {
			return fmt.Errorf("error reading chunk %d for file %s: %s", tr.curChunk, filename, err)
		}
		if n != chunkLenWithHeader {
			return fmt.Errorf("read incomplete chunk (%d bytes) for file %s at chunk index %d", n, filename, tr.curChunk)
		}

		// TODO: skip chunks with first time > through in the external header.
		firstTime := model.Time(binary.LittleEndian.Uint64(buf[chunkHeaderFirstTimeOffset:]))
		if firstTime > through {
			return nil
		}

		c, err := chunk.NewForEncoding(chunk.Encoding(buf[chunkHeaderTypeOffset]))
		if err != nil {
			return fmt.Errorf("unable to create chunk from index %d in file %s: %s", tr.curChunk, filename, err)
		}
		c.UnmarshalFromBuf(buf[chunkHeaderLen:])
		it := c.NewIterator()
		for it.Scan() {
			sp := it.Value()
			if sp.Timestamp.Before(from) {
				// Skip through this chunk until we hit a sample within our range.
				continue
			}
			if sp.Timestamp.After(through) {
				// We've found a sample past our range, everything afterwards we don't care about
				// anymore.
				return nil
			}
			if tr.v2Ref != 0 {
				tr.v2Ref, err = app.Add(tr.labels, int64(sp.Timestamp), float64(sp.Value))
				if err != nil {
					if err == tsdb.ErrNotFound && tr.skipUnknownLabels {
						continue
					}
					return fmt.Errorf("v2Ref non-empty; unable to add to v2 storage: %s", err)
				}
			} else {
				err := app.AddFast(tr.v2Ref, int64(sp.Timestamp), float64(sp.Value))
				switch errors.Cause(err) {
				case nil:
					// All good.
				case tsdb.ErrNotFound:
					tr.v2Ref, err = app.Add(tr.labels, int64(sp.Timestamp), float64(sp.Value))
					if err != nil {
						if err == tsdb.ErrNotFound && tr.skipUnknownLabels {
							continue
						}
						return fmt.Errorf("unable to add to v2 storage: %s", err)
					}
				default:
					return fmt.Errorf("unable to fast-add to v2 storage: %s", err)
				}
			}
		}
		if it.Err() != nil {
			return fmt.Errorf("error iterating through chunk %d in file %s: %s", tr.curChunk, filename, err)
		}
		tr.curChunk++
	}
}