func()

in client/incremental_http.go [87:195]


func (p *Client) IncrementalHTTP(ctx context.Context, query string, options ...Option) *IncrementalHandler {
	r, err := p.newRequest(query, options...)
	if err != nil {
		return errorIncremental(fmt.Errorf("request: %w", err))
	}
	r.Header.Set("Accept", "multipart/mixed")

	w := httptest.NewRecorder()
	p.h.ServeHTTP(w, r)

	res := w.Result() //nolint:bodyclose // Remains open since we are reading from it incrementally.
	if res.StatusCode >= http.StatusBadRequest {
		return errorIncremental(fmt.Errorf("http %d: %s", w.Code, w.Body.String()))
	}
	mediaType, params, err := mime.ParseMediaType(res.Header.Get("Content-Type"))
	if err != nil {
		return errorIncremental(fmt.Errorf("parse content-type: %w", err))
	}
	if mediaType != "multipart/mixed" {
		return errorIncremental(fmt.Errorf("expected content-type multipart/mixed, got %s", mediaType))
	}

	// TODO: worth checking the deferSpec either to confirm this client
	// supports it exactly, or simply to make sure it is within some
	// expected range.
	deferSpec, ok := params["deferspec"]
	if !ok || deferSpec == "" {
		return errorIncremental(errors.New("expected deferSpec in content-type"))
	}

	boundary, ok := params["boundary"]
	if !ok || boundary == "" {
		return errorIncremental(errors.New("expected boundary in content-type"))
	}
	mr := multipart.NewReader(res.Body, boundary)

	ctx, cancel := context.WithCancelCause(ctx)
	initial := true

	return &IncrementalHandler{
		close: func() error {
			res.Body.Close()
			cancel(context.Canceled)
			return nil
		},
		next: func(response any) (err error) {
			defer func() {
				if err != nil {
					res.Body.Close()
					cancel(err)
				}
			}()

			var data any
			var rawErrors json.RawMessage

			type nextPart struct {
				*multipart.Part
				Err error
			}

			nextPartCh := make(chan nextPart)
			go func() {
				var next nextPart
				next.Part, next.Err = mr.NextPart()
				nextPartCh <- next
			}()

			var next nextPart
			select {
			case <-ctx.Done():
				return ctx.Err()
			case next = <-nextPartCh:
			}

			if next.Err == io.EOF {
				res.Body.Close()
				cancel(context.Canceled)
				return nil
			}
			if err = next.Err; err != nil {
				return err
			}
			if ct := next.Header.Get("Content-Type"); ct != "application/json" {
				err = fmt.Errorf(`expected content-type "application/json", got %q`, ct)
				return err
			}

			if initial {
				initial = false
				data = IncrementalInitialResponse{}
			} else {
				data = IncrementalResponse{}
			}
			if err = json.NewDecoder(next.Part).Decode(&data); err != nil {
				return err
			}

			// We want to unpack even if there is an error, so we can see partial
			// responses.
			err = unpack(data, response, p.dc)
			if len(rawErrors) != 0 {
				err = RawJsonError{rawErrors}
				return err
			}
			return err
		},
	}
}