in loader/stream_loader.go [147:180]
func (s *StreamLoad) createRequest(url string, reader io.Reader, workerIndex int, taskIndex int) (req *http.Request, err error) {
req, err = http.NewRequest("PUT", url, reader)
if err != nil {
return
}
// set auth
req.SetBasicAuth(s.userName, s.password)
req.Header.Set("Expect", "100-continue")
req.Header.Set("Content-Type", "text/plain")
for k, v := range s.headers {
req.Header.Set(k, v)
// If a label has already been set in the headers, to prevent conflicts,
//generate a unique label by combining the original label, worker index, and task index.
if k == "label" {
var builder strings.Builder
builder.WriteString(v)
builder.WriteString("_")
builder.WriteString(strconv.Itoa(workerIndex))
builder.WriteString("_")
builder.WriteString(strconv.Itoa(taskIndex))
req.Header.Set("label", builder.String())
}
}
if s.Compress {
req.Header.Set("Content-Encoding", "lz4")
req.Header.Set("compress_type", "LZ4")
}
return
}