func handleReq()

in systemtest/gencorpora/catbulk.go [163:229]


func handleReq(metaUpdateChan chan docsStat, writer io.Writer) http.HandlerFunc {
	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		w.Header().Set("X-Elastic-Product", "Elasticsearch")
		switch req.Method {
		case http.MethodGet:
			w.WriteHeader(http.StatusOK)
			w.Write([]byte(`{"cluster_uuid": "cat_bulk"}`))
		case http.MethodPost:
			reader := req.Body
			defer req.Body.Close()

			if encoding := req.Header.Get("Content-Encoding"); encoding == "gzip" {
				var err error
				reader, err = gzip.NewReader(reader)
				if err != nil {
					log.Println("failed to read request body", err)
					w.WriteHeader(http.StatusBadRequest)
					return
				}
				defer reader.Close()
			}

			mockResp := esutil.BulkIndexerResponse{}
			scanner := bufio.NewScanner(reader)
			scanner.Split(splitMetadataAndSource)

			var stat docsStat
			for scanner.Scan() {
				n, err := writer.Write(scanner.Bytes())
				if err != nil {
					// Discard the request without processing further
					log.Println("failed to write ES corpora to a file", err)
					w.WriteHeader(http.StatusInternalServerError)
					return
				}

				stat.count++
				stat.bytes += n

				item := map[string]esutil.BulkIndexerResponseItem{
					"action": {Status: http.StatusOK},
				}
				mockResp.Items = append(mockResp.Items, item)
			}

			if err := scanner.Err(); err != nil {
				log.Println("failed to read ES corpora", err)
				w.WriteHeader(http.StatusBadRequest)
				return
			}

			// Update metadata with the ES document statistics generated by this request
			metaUpdateChan <- stat

			resp, err := json.Marshal(mockResp)
			if err != nil {
				log.Println("failed to encode response to JSON", err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
			w.WriteHeader(http.StatusOK)
			w.Write(resp)
		default:
			w.WriteHeader(http.StatusNotFound)
		}
	})
}