graphql/handler/transport/http_multipart_mixed.go (200 lines of code) (raw):

package transport import ( "encoding/json" "fmt" "io" "log" "mime" "net/http" "strings" "sync" "time" "github.com/vektah/gqlparser/v2/gqlerror" "github.com/99designs/gqlgen/graphql" ) // MultipartMixed is a transport that supports the multipart/mixed spec type MultipartMixed struct { Boundary string DeliveryTimeout time.Duration } var _ graphql.Transport = MultipartMixed{} // Supports checks if the request supports the multipart/mixed spec // Might be worth check the spec required, but Apollo Client mislabel the spec in the headers. func (t MultipartMixed) Supports(r *http.Request) bool { if !strings.Contains(r.Header.Get("Accept"), "multipart/mixed") { return false } mediaType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { return false } return r.Method == http.MethodPost && mediaType == "application/json" } // Do implements the multipart/mixed spec as a multipart/mixed response func (t MultipartMixed) Do(w http.ResponseWriter, r *http.Request, exec graphql.GraphExecutor) { // Implements the multipart/mixed spec as a multipart/mixed response: // * https://github.com/graphql/graphql-wg/blob/e4ef5f9d5997815d9de6681655c152b6b7838b4c/rfcs/DeferStream.md // 2022/08/23 as implemented by gqlgen. // * https://github.com/graphql/graphql-wg/blob/f22ea7748c6ebdf88fdbf770a8d9e41984ebd429/rfcs/DeferStream.md June 2023 Spec for the // `incremental` field // * https://github.com/graphql/graphql-over-http/blob/main/rfcs/IncrementalDelivery.md // multipart specification // Follows the format that is used in the Apollo Client tests: // https://github.com/apollographql/apollo-client/blob/v3.11.8/src/link/http/__tests__/responseIterator.ts#L68 // Apollo Client, despite mentioning in its requests that they require the 2022 spec, it wants the // `incremental` field to be an array of responses, not a single response. Theoretically we could // batch responses in the `incremental` field, if we wanted to optimize this code. ctx := r.Context() flusher, ok := w.(http.Flusher) if !ok { SendErrorf(w, http.StatusInternalServerError, "streaming unsupported") return } defer flusher.Flush() w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // This header will be replaced below, but it's required in case we return errors. w.Header().Set("Content-Type", "application/json") boundary := t.Boundary if boundary == "" { boundary = "-" } timeout := t.DeliveryTimeout if timeout.Milliseconds() < 1 { // If the timeout is less than 1ms, we'll set it to 1ms to avoid a busy loop timeout = 1 * time.Millisecond } params := &graphql.RawParams{} start := graphql.Now() params.Headers = r.Header params.ReadTime = graphql.TraceTiming{ Start: start, End: graphql.Now(), } bodyString, err := getRequestBody(r) if err != nil { gqlErr := gqlerror.Errorf("could not get json request body: %+v", err) resp := exec.DispatchError(ctx, gqlerror.List{gqlErr}) log.Printf("could not get json request body: %+v", err.Error()) writeJson(w, resp) return } bodyReader := io.NopCloser(strings.NewReader(bodyString)) if err = jsonDecode(bodyReader, &params); err != nil { w.WriteHeader(http.StatusBadRequest) gqlErr := gqlerror.Errorf( "json request body could not be decoded: %+v body:%s", err, bodyString, ) resp := exec.DispatchError(ctx, gqlerror.List{gqlErr}) log.Printf("decoding error: %+v body:%s", err.Error(), bodyString) writeJson(w, resp) return } rc, opErr := exec.CreateOperationContext(ctx, params) ctx = graphql.WithOperationContext(ctx, rc) if opErr != nil { w.WriteHeader(statusFor(opErr)) resp := exec.DispatchError(ctx, opErr) writeJson(w, resp) return } // Example of the response format (note the new lines and boundaries are important!): // https://github.com/graphql/graphql-over-http/blob/main/rfcs/IncrementalDelivery.md // --graphql // Content-Type: application/json // // {"data":{"apps":{"apps":[ .. ],"totalNumApps":161,"__typename":"AppsOutput"}},"hasNext":true} // --graphql // Content-Type: application/json // // {"incremental":[{"data":{"groupAccessCount":0},"label":"test","path":["apps","apps",7],"hasNext":true}],"hasNext":true} // --graphql // ... // --graphql-- // Last boundary is a closing boundary with two dashes at the end. w.Header().Set( "Content-Type", fmt.Sprintf(`multipart/mixed;boundary="%s";deferSpec=20220824`, boundary), ) a := newMultipartResponseAggregator(w, boundary, timeout) defer a.Done(w) responses, ctx := exec.DispatchOperation(ctx, rc) initialResponse := true for { response := responses(ctx) if response == nil { break } a.Add(response, initialResponse) initialResponse = false } } func writeIncrementalJson(w io.Writer, responses []*graphql.Response, hasNext bool) { // TODO: Remove this wrapper on response once gqlgen supports the 2023 spec b, err := json.Marshal(struct { Incremental []*graphql.Response `json:"incremental"` HasNext bool `json:"hasNext"` }{ Incremental: responses, HasNext: hasNext, }) if err != nil { panic(err) } w.Write(b) } func writeBoundary(w io.Writer, boundary string, finalResponse bool) { if finalResponse { fmt.Fprintf(w, "--%s--\r\n", boundary) return } fmt.Fprintf(w, "--%s\r\n", boundary) } func writeContentTypeHeader(w io.Writer) { fmt.Fprintf(w, "Content-Type: application/json\r\n\r\n") } // multipartResponseAggregator helps us reduce the number of responses sent to the frontend by batching all the // incremental responses together. type multipartResponseAggregator struct { mu sync.Mutex boundary string initialResponse *graphql.Response deferResponses []*graphql.Response done chan bool } // newMultipartResponseAggregator creates a new multipartResponseAggregator // The aggregator will flush responses to the client every `tickerDuration` (default 1ms) so that // multiple incremental responses are batched together. func newMultipartResponseAggregator( w http.ResponseWriter, boundary string, tickerDuration time.Duration, ) *multipartResponseAggregator { a := &multipartResponseAggregator{ boundary: boundary, done: make(chan bool, 1), } go func() { ticker := time.NewTicker(tickerDuration) defer ticker.Stop() for { select { case <-a.done: return case <-ticker.C: a.flush(w) } } }() return a } // Done flushes the remaining responses func (a *multipartResponseAggregator) Done(w http.ResponseWriter) { a.done <- true a.flush(w) } // Add accumulates the responses func (a *multipartResponseAggregator) Add(resp *graphql.Response, initialResponse bool) { a.mu.Lock() defer a.mu.Unlock() if initialResponse { a.initialResponse = resp return } a.deferResponses = append(a.deferResponses, resp) } // flush sends the accumulated responses to the client func (a *multipartResponseAggregator) flush(w http.ResponseWriter) { a.mu.Lock() defer a.mu.Unlock() // If we don't have any responses, we can return early if a.initialResponse == nil && len(a.deferResponses) == 0 { return } flusher, ok := w.(http.Flusher) if !ok { // This should never happen, as we check for this much earlier on panic("response writer does not support flushing") } hasNext := false if a.initialResponse != nil { // Initial response will need to begin with the boundary writeBoundary(w, a.boundary, false) writeContentTypeHeader(w) writeJson(w, a.initialResponse) hasNext = a.initialResponse.HasNext != nil && *a.initialResponse.HasNext // Handle when initial is aggregated with deferred responses. if len(a.deferResponses) > 0 { fmt.Fprintf(w, "\r\n") writeBoundary(w, a.boundary, false) } // Reset the initial response so we don't send it again a.initialResponse = nil } if len(a.deferResponses) > 0 { writeContentTypeHeader(w) // Note: while the 2023 spec that includes "incremental" does not // explicitly list the fields that should be included as part of the // incremental object, it shows hasNext only on the response payload // (marking the status of the operation as a whole), and instead the // response payload implements pending and complete fields to mark the // status of the incrementally delivered data. // // TODO: use the "HasNext" status of deferResponses items to determine // the operation status and pending / complete fields, but remove from // the incremental (deferResponses) object. hasNext = a.deferResponses[len(a.deferResponses)-1].HasNext != nil && *a.deferResponses[len(a.deferResponses)-1].HasNext writeIncrementalJson(w, a.deferResponses, hasNext) // Reset the deferResponses so we don't send them again a.deferResponses = nil } // Make sure to put the delimiter after every request, so that Apollo Client knows that the // current payload has been sent, and updates the UI. This is particular important for the first // response and the last response, which may either hang or never get handled. // Final response will have a closing boundary with two dashes at the end. fmt.Fprintf(w, "\r\n") writeBoundary(w, a.boundary, !hasNext) flusher.Flush() }