func()

in pipe/file.go [332:370]


func (p *fileConsumer) seek(topic string, offset int64) (string, int64, error) {
	tp := p.topicPath(topic)
	dir := filepath.Dir(tp)
	files, err := p.fs.ReadDir(dir, tp)
	if err != nil {
		if os.IsNotExist(err) {
			return "", 0, nil
		}

		return "", 0, err
	}

	if len(files) == 0 {
		return "", 0, nil
	}

	if offset == OffsetOldest {
		for _, f := range files {
			if strings.HasPrefix(dir+"/"+f.Name(), tp) && !f.IsDir() {
				if strings.HasSuffix(f.Name(), ".open") {
					return "", 0, nil
				}
				return f.Name(), 0, nil
			}
		}
		return "", 0, nil
	}

	if offset == OffsetNewest {
		for i := len(files) - 1; i >= 0; i-- {
			if strings.HasPrefix(dir+"/"+files[i].Name(), tp) && !files[i].IsDir() {
				return files[i].Name(), files[i].Size(), nil
			}
		}
		return "", 0, nil
	}

	return "", 0, fmt.Errorf("arbitrary offsets not supported, only OffsetOldest and OffsetNewest offsets supported")
}