in packages/ssestream/ssestream.go [149:199]
func (s *Stream[T]) Next() bool {
if s.err != nil {
return false
}
for s.decoder.Next() {
if s.done {
continue
}
if bytes.HasPrefix(s.decoder.Event().Data, []byte("[DONE]")) {
// In this case we don't break because we still want to iterate through the full stream.
s.done = true
continue
}
var nxt T
if s.decoder.Event().Type == "" || strings.HasPrefix(s.decoder.Event().Type, "response.") {
ep := gjson.GetBytes(s.decoder.Event().Data, "error")
if ep.Exists() {
s.err = fmt.Errorf("received error while streaming: %s", ep.String())
return false
}
s.err = json.Unmarshal(s.decoder.Event().Data, &nxt)
if s.err != nil {
return false
}
s.cur = nxt
return true
} else {
ep := gjson.GetBytes(s.decoder.Event().Data, "error")
if ep.Exists() {
s.err = fmt.Errorf("received error while streaming: %s", ep.String())
return false
}
event := s.decoder.Event().Type
data := s.decoder.Event().Data
s.err = json.Unmarshal([]byte(fmt.Sprintf(`{ "event": %q, "data": %s }`, event, data)), &nxt)
if s.err != nil {
return false
}
s.cur = nxt
return true
}
}
// decoder.Next() may be false because of an error
s.err = s.decoder.Err()
return false
}