client/incremental_http.go (143 lines of code) (raw):
package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"net/http/httptest"
)
type IncrementalHandler struct {
close func() error
next func(response any) error
}
func (i *IncrementalHandler) Close() error {
return i.close()
}
func (i *IncrementalHandler) Next(response any) error {
return i.next(response)
}
type IncrementalInitialResponse struct {
Data any `json:"data"`
Label string `json:"label"`
Path []any `json:"path"`
HasNext bool `json:"hasNext"`
Errors json.RawMessage `json:"errors"`
Extensions map[string]any `json:"extensions"`
}
type IncrementalData struct {
// Support for "items" for @stream is not yet available, only "data" for
// @defer, as per the 2023 spec. Similarly, this retains a more complete
// list of fields, but not "id," and represents a mid-point between the
// 2022 and 2023 specs.
Data any `json:"data"`
Label string `json:"label"`
Path []any `json:"path"`
HasNext bool `json:"hasNext"`
Errors json.RawMessage `json:"errors"`
Extensions map[string]any `json:"extensions"`
}
type IncrementalResponse struct {
// Does not include the pending or completed fields from the 2023 spec.
Incremental []IncrementalData `json:"incremental"`
HasNext bool `json:"hasNext"`
Errors json.RawMessage `json:"errors"`
Extensions map[string]any `json:"extensions"`
}
func errorIncremental(err error) *IncrementalHandler {
return &IncrementalHandler{
close: func() error { return nil },
next: func(response any) error {
return err
},
}
}
// IncrementalHTTP returns a GraphQL response handler for the current
// GQLGen implementation of the [incremental delivery over HTTP spec].
// The IncrementalHTTP spec provides for "streaming" responses triggered by
// the use of @stream or @defer as an alternate approach to SSE. To that end,
// the client retains the interface of the handler returned from
// Client.SSE.
//
// IncrementalHTTP delivery using multipart/mixed is just the structure
// of the response: the payloads are specified by the defer-stream spec,
// which are in transition. For more detail, see the links in the
// definition for transport.MultipartMixed. We use the name
// IncrementalHTTP here to distinguish from the multipart form upload
// (the term "multipart" usually referring to the latter).
//
// IncrementalHandler is not safe for concurrent use, or for production
// use at all.
//
// [incremental delivery over HTTP spec]: https://github.com/graphql/graphql-over-http/blob/main/rfcs/IncrementalDelivery.md
func (p *Client) IncrementalHTTP(ctx context.Context, query string, options ...Option) *IncrementalHandler {
r, err := p.newRequest(query, options...)
if err != nil {
return errorIncremental(fmt.Errorf("request: %w", err))
}
r.Header.Set("Accept", "multipart/mixed")
w := httptest.NewRecorder()
p.h.ServeHTTP(w, r)
res := w.Result() //nolint:bodyclose // Remains open since we are reading from it incrementally.
if res.StatusCode >= http.StatusBadRequest {
return errorIncremental(fmt.Errorf("http %d: %s", w.Code, w.Body.String()))
}
mediaType, params, err := mime.ParseMediaType(res.Header.Get("Content-Type"))
if err != nil {
return errorIncremental(fmt.Errorf("parse content-type: %w", err))
}
if mediaType != "multipart/mixed" {
return errorIncremental(fmt.Errorf("expected content-type multipart/mixed, got %s", mediaType))
}
// TODO: worth checking the deferSpec either to confirm this client
// supports it exactly, or simply to make sure it is within some
// expected range.
deferSpec, ok := params["deferspec"]
if !ok || deferSpec == "" {
return errorIncremental(errors.New("expected deferSpec in content-type"))
}
boundary, ok := params["boundary"]
if !ok || boundary == "" {
return errorIncremental(errors.New("expected boundary in content-type"))
}
mr := multipart.NewReader(res.Body, boundary)
ctx, cancel := context.WithCancelCause(ctx)
initial := true
return &IncrementalHandler{
close: func() error {
res.Body.Close()
cancel(context.Canceled)
return nil
},
next: func(response any) (err error) {
defer func() {
if err != nil {
res.Body.Close()
cancel(err)
}
}()
var data any
var rawErrors json.RawMessage
type nextPart struct {
*multipart.Part
Err error
}
nextPartCh := make(chan nextPart)
go func() {
var next nextPart
next.Part, next.Err = mr.NextPart()
nextPartCh <- next
}()
var next nextPart
select {
case <-ctx.Done():
return ctx.Err()
case next = <-nextPartCh:
}
if next.Err == io.EOF {
res.Body.Close()
cancel(context.Canceled)
return nil
}
if err = next.Err; err != nil {
return err
}
if ct := next.Header.Get("Content-Type"); ct != "application/json" {
err = fmt.Errorf(`expected content-type "application/json", got %q`, ct)
return err
}
if initial {
initial = false
data = IncrementalInitialResponse{}
} else {
data = IncrementalResponse{}
}
if err = json.NewDecoder(next.Part).Decode(&data); err != nil {
return err
}
// We want to unpack even if there is an error, so we can see partial
// responses.
err = unpack(data, response, p.dc)
if len(rawErrors) != 0 {
err = RawJsonError{rawErrors}
return err
}
return err
},
}
}