in pkg/api/api.go [159:252]
func (h *APIHandler) Bulk(w http.ResponseWriter, r *http.Request) {
h.configMu.RLock()
defer h.configMu.RUnlock()
attrs := metric.WithAttributeSet(requestAttributes(r))
h.metrics.bulkCreateTotalMetrics.Add(context.Background(), 1, attrs)
methodStatus := h.MethodOdds[rand.Intn(len(h.MethodOdds))]
if methodStatus == http.StatusRequestEntityTooLarge {
h.metrics.bulkCreateTooLargeMetrics.Add(context.Background(), 1, attrs)
w.WriteHeader(methodStatus)
return
}
var scanner *bufio.Scanner
br := BulkResponse{}
encoding, prs := r.Header[http.CanonicalHeaderKey("Content-Encoding")]
switch {
case prs && encoding[0] == "gzip":
zr, err := gzip.NewReader(r.Body)
if err != nil {
log.Printf("error new gzip reader failed: %s", err)
return
}
scanner = bufio.NewScanner(zr)
default:
scanner = bufio.NewScanner(r.Body)
}
// bulk requests come in as 2 lines
// the action on first line, followed by the document on the next line.
// we only care about the action, which is why we have skipNextLine var
// eg:
// { "update": {"_id": "5", "_index": "index1"} }
// { "doc": {"my_field": "baz"} }
var skipNextLine bool
var body []byte
for scanner.Scan() {
b := scanner.Bytes()
body = append(body, b...)
if skipNextLine || len(b) == 0 {
skipNextLine = false
continue
}
var j map[string]any
err := json.Unmarshal(b, &j)
if err != nil {
log.Printf("error unmarshal: %s", err)
continue
}
if len(j) != 1 {
log.Printf("error, number of keys off: %d should be 1", len(j))
continue
}
for k := range j {
switch k {
case "index":
h.metrics.bulkIndexTotalMetrics.Add(context.Background(), 1, attrs)
skipNextLine = true
case "create":
skipNextLine = true
actionStatus := h.ActionOdds[rand.Intn(len(h.ActionOdds))]
switch actionStatus {
case http.StatusOK:
h.metrics.bulkCreateOkMetrics.Add(context.Background(), 1, attrs)
case http.StatusConflict:
br.Errors = true
h.metrics.bulkCreateDuplicateMetrics.Add(context.Background(), 1, attrs)
case http.StatusTooManyRequests:
br.Errors = true
h.metrics.bulkCreateTooManyMetrics.Add(context.Background(), 1, attrs)
case http.StatusNotAcceptable:
br.Errors = true
h.metrics.bulkCreateNonIndexMetrics.Add(context.Background(), 1, attrs)
}
br.Items = append(br.Items, map[string]any{"created": map[string]any{"status": actionStatus}})
case "update":
h.metrics.bulkUpdateTotalMetrics.Add(context.Background(), 1, attrs)
skipNextLine = true
case "delete":
h.metrics.bulkDeleteTotalMetrics.Add(context.Background(), 1, attrs)
skipNextLine = false
}
}
}
h.recordRequest(r, body)
brBytes, err := json.Marshal(br)
if err != nil {
log.Printf("error marshal bulk reply: %s", err)
return
}
w.Header().Set(http.CanonicalHeaderKey("Content-Type"), "application/json")
w.Write(brBytes)
return
}