func()

in loader/stream_loader.go [147:180]


func (s *StreamLoad) createRequest(url string, reader io.Reader, workerIndex int, taskIndex int) (req *http.Request, err error) {
	req, err = http.NewRequest("PUT", url, reader)
	if err != nil {
		return
	}

	// set auth
	req.SetBasicAuth(s.userName, s.password)
	req.Header.Set("Expect", "100-continue")
	req.Header.Set("Content-Type", "text/plain")
	for k, v := range s.headers {
		req.Header.Set(k, v)
		// If a label has already been set in the headers, to prevent conflicts,
		//generate a unique label by combining the original label, worker index, and task index.
		if k == "label" {
			var builder strings.Builder
			builder.WriteString(v)
			builder.WriteString("_")
			builder.WriteString(strconv.Itoa(workerIndex))
			builder.WriteString("_")
			builder.WriteString(strconv.Itoa(taskIndex))

			req.Header.Set("label", builder.String())
		}

	}

	if s.Compress {
		req.Header.Set("Content-Encoding", "lz4")
		req.Header.Set("compress_type", "LZ4")
	}

	return
}