in client/incremental_http.go [87:195]
func (p *Client) IncrementalHTTP(ctx context.Context, query string, options ...Option) *IncrementalHandler {
r, err := p.newRequest(query, options...)
if err != nil {
return errorIncremental(fmt.Errorf("request: %w", err))
}
r.Header.Set("Accept", "multipart/mixed")
w := httptest.NewRecorder()
p.h.ServeHTTP(w, r)
res := w.Result() //nolint:bodyclose // Remains open since we are reading from it incrementally.
if res.StatusCode >= http.StatusBadRequest {
return errorIncremental(fmt.Errorf("http %d: %s", w.Code, w.Body.String()))
}
mediaType, params, err := mime.ParseMediaType(res.Header.Get("Content-Type"))
if err != nil {
return errorIncremental(fmt.Errorf("parse content-type: %w", err))
}
if mediaType != "multipart/mixed" {
return errorIncremental(fmt.Errorf("expected content-type multipart/mixed, got %s", mediaType))
}
// TODO: worth checking the deferSpec either to confirm this client
// supports it exactly, or simply to make sure it is within some
// expected range.
deferSpec, ok := params["deferspec"]
if !ok || deferSpec == "" {
return errorIncremental(errors.New("expected deferSpec in content-type"))
}
boundary, ok := params["boundary"]
if !ok || boundary == "" {
return errorIncremental(errors.New("expected boundary in content-type"))
}
mr := multipart.NewReader(res.Body, boundary)
ctx, cancel := context.WithCancelCause(ctx)
initial := true
return &IncrementalHandler{
close: func() error {
res.Body.Close()
cancel(context.Canceled)
return nil
},
next: func(response any) (err error) {
defer func() {
if err != nil {
res.Body.Close()
cancel(err)
}
}()
var data any
var rawErrors json.RawMessage
type nextPart struct {
*multipart.Part
Err error
}
nextPartCh := make(chan nextPart)
go func() {
var next nextPart
next.Part, next.Err = mr.NextPart()
nextPartCh <- next
}()
var next nextPart
select {
case <-ctx.Done():
return ctx.Err()
case next = <-nextPartCh:
}
if next.Err == io.EOF {
res.Body.Close()
cancel(context.Canceled)
return nil
}
if err = next.Err; err != nil {
return err
}
if ct := next.Header.Get("Content-Type"); ct != "application/json" {
err = fmt.Errorf(`expected content-type "application/json", got %q`, ct)
return err
}
if initial {
initial = false
data = IncrementalInitialResponse{}
} else {
data = IncrementalResponse{}
}
if err = json.NewDecoder(next.Part).Decode(&data); err != nil {
return err
}
// We want to unpack even if there is an error, so we can see partial
// responses.
err = unpack(data, response, p.dc)
if len(rawErrors) != 0 {
err = RawJsonError{rawErrors}
return err
}
return err
},
}
}