in pipe/file.go [332:370]
func (p *fileConsumer) seek(topic string, offset int64) (string, int64, error) {
tp := p.topicPath(topic)
dir := filepath.Dir(tp)
files, err := p.fs.ReadDir(dir, tp)
if err != nil {
if os.IsNotExist(err) {
return "", 0, nil
}
return "", 0, err
}
if len(files) == 0 {
return "", 0, nil
}
if offset == OffsetOldest {
for _, f := range files {
if strings.HasPrefix(dir+"/"+f.Name(), tp) && !f.IsDir() {
if strings.HasSuffix(f.Name(), ".open") {
return "", 0, nil
}
return f.Name(), 0, nil
}
}
return "", 0, nil
}
if offset == OffsetNewest {
for i := len(files) - 1; i >= 0; i-- {
if strings.HasPrefix(dir+"/"+files[i].Name(), tp) && !files[i].IsDir() {
return files[i].Name(), files[i].Size(), nil
}
}
return "", 0, nil
}
return "", 0, fmt.Errorf("arbitrary offsets not supported, only OffsetOldest and OffsetNewest offsets supported")
}