func()

in pkg/api/api.go [159:252]


func (h *APIHandler) Bulk(w http.ResponseWriter, r *http.Request) {
	h.configMu.RLock()
	defer h.configMu.RUnlock()

	attrs := metric.WithAttributeSet(requestAttributes(r))
	h.metrics.bulkCreateTotalMetrics.Add(context.Background(), 1, attrs)
	methodStatus := h.MethodOdds[rand.Intn(len(h.MethodOdds))]
	if methodStatus == http.StatusRequestEntityTooLarge {
		h.metrics.bulkCreateTooLargeMetrics.Add(context.Background(), 1, attrs)
		w.WriteHeader(methodStatus)
		return
	}

	var scanner *bufio.Scanner
	br := BulkResponse{}
	encoding, prs := r.Header[http.CanonicalHeaderKey("Content-Encoding")]
	switch {
	case prs && encoding[0] == "gzip":
		zr, err := gzip.NewReader(r.Body)
		if err != nil {
			log.Printf("error new gzip reader failed: %s", err)
			return
		}
		scanner = bufio.NewScanner(zr)
	default:
		scanner = bufio.NewScanner(r.Body)
	}
	// bulk requests come in as 2 lines
	// the action on first line, followed by the document on the next line.
	// we only care about the action, which is why we have skipNextLine var
	// eg:
	// { "update": {"_id": "5", "_index": "index1"} }
	// { "doc": {"my_field": "baz"} }

	var skipNextLine bool
	var body []byte
	for scanner.Scan() {
		b := scanner.Bytes()
		body = append(body, b...)
		if skipNextLine || len(b) == 0 {
			skipNextLine = false
			continue
		}
		var j map[string]any
		err := json.Unmarshal(b, &j)
		if err != nil {
			log.Printf("error unmarshal: %s", err)
			continue
		}
		if len(j) != 1 {
			log.Printf("error, number of keys off: %d should be 1", len(j))
			continue
		}
		for k := range j {
			switch k {
			case "index":
				h.metrics.bulkIndexTotalMetrics.Add(context.Background(), 1, attrs)
				skipNextLine = true
			case "create":
				skipNextLine = true
				actionStatus := h.ActionOdds[rand.Intn(len(h.ActionOdds))]
				switch actionStatus {
				case http.StatusOK:
					h.metrics.bulkCreateOkMetrics.Add(context.Background(), 1, attrs)
				case http.StatusConflict:
					br.Errors = true
					h.metrics.bulkCreateDuplicateMetrics.Add(context.Background(), 1, attrs)
				case http.StatusTooManyRequests:
					br.Errors = true
					h.metrics.bulkCreateTooManyMetrics.Add(context.Background(), 1, attrs)
				case http.StatusNotAcceptable:
					br.Errors = true
					h.metrics.bulkCreateNonIndexMetrics.Add(context.Background(), 1, attrs)
				}
				br.Items = append(br.Items, map[string]any{"created": map[string]any{"status": actionStatus}})
			case "update":
				h.metrics.bulkUpdateTotalMetrics.Add(context.Background(), 1, attrs)
				skipNextLine = true
			case "delete":
				h.metrics.bulkDeleteTotalMetrics.Add(context.Background(), 1, attrs)
				skipNextLine = false
			}
		}
	}
	h.recordRequest(r, body)
	brBytes, err := json.Marshal(br)
	if err != nil {
		log.Printf("error marshal bulk reply: %s", err)
		return
	}
	w.Header().Set(http.CanonicalHeaderKey("Content-Type"), "application/json")
	w.Write(brBytes)
	return
}