in systemtest/gencorpora/catbulk.go [163:229]
func handleReq(metaUpdateChan chan docsStat, writer io.Writer) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
switch req.Method {
case http.MethodGet:
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"cluster_uuid": "cat_bulk"}`))
case http.MethodPost:
reader := req.Body
defer req.Body.Close()
if encoding := req.Header.Get("Content-Encoding"); encoding == "gzip" {
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
log.Println("failed to read request body", err)
w.WriteHeader(http.StatusBadRequest)
return
}
defer reader.Close()
}
mockResp := esutil.BulkIndexerResponse{}
scanner := bufio.NewScanner(reader)
scanner.Split(splitMetadataAndSource)
var stat docsStat
for scanner.Scan() {
n, err := writer.Write(scanner.Bytes())
if err != nil {
// Discard the request without processing further
log.Println("failed to write ES corpora to a file", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
stat.count++
stat.bytes += n
item := map[string]esutil.BulkIndexerResponseItem{
"action": {Status: http.StatusOK},
}
mockResp.Items = append(mockResp.Items, item)
}
if err := scanner.Err(); err != nil {
log.Println("failed to read ES corpora", err)
w.WriteHeader(http.StatusBadRequest)
return
}
// Update metadata with the ES document statistics generated by this request
metaUpdateChan <- stat
resp, err := json.Marshal(mockResp)
if err != nil {
log.Println("failed to encode response to JSON", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(resp)
default:
w.WriteHeader(http.StatusNotFound)
}
})
}