in loader/stream_loader.go [245:318]
func (s *StreamLoad) send(url string, reader io.Reader, workerIndex int, taskIndex int) (*http.Response, error) {
realUrl := url
for {
req, err := s.createRequest(realUrl, reader, workerIndex, taskIndex)
if err != nil {
if req == nil {
return nil, err
} else {
return req.Response, err
}
}
// create client
client := &http.Client{
// max retry 10 times
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return fmt.Errorf("redirect")
},
Timeout: time.Duration(s.Timeout) * time.Second,
}
// send request
resp, err := client.Do(req)
if err != nil {
return resp, err
}
defer resp.Body.Close()
// read response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Errorf("Read stream load response failed, response body: %s, error message: %v, it is hard to judge whether load success or not, we suggest:\n1.Do select count(*) to check whether data is partially loaded.\n2.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.", body, err)
return resp, nil
}
if resp.StatusCode == 307 {
realUrl = resp.Header.Get("Location")
if realUrl == "" {
log.Errorf("Send error, redirectUrl is empty, error message: %v", err)
return resp, nil
}
log.Infof("redirect to %s", realUrl)
continue
}
// check response
if resp.StatusCode != 200 {
// print response headers
log.Debugf("response headers: %v", resp.Header)
return resp, fmt.Errorf("response code is not 200, response code: %d, response body: %s", resp.StatusCode, body)
}
// parse response
var respMsg StreamLoadResp
err = json.Unmarshal(body, &respMsg)
if err != nil {
log.Errorf("Parse stream load response failed, response body: %s, error message: %v, it is hard to judge whether load success or not, we suggest:\n1.Do select count(*) to check whether data is partially loaded\n2.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n", body, err)
return resp, nil
}
if respMsg.Status != "Success" && respMsg.Status != "Publish Timeout" {
return resp, fmt.Errorf("stream load failed, response %s", body)
}
log.Infof("resp: %s", body)
// update resp
atomic.AddUint64(&s.loadResp.LoadedRows, uint64(respMsg.NumberLoadedRows))
atomic.AddUint64(&s.loadResp.FilteredRows, uint64(respMsg.NumberFilteredRows))
atomic.AddUint64(&s.loadResp.UnselectedRows, uint64(respMsg.NumberUnselectedRows))
atomic.AddUint64(&s.loadResp.LoadBytes, uint64(respMsg.LoadBytes))
return resp, nil
}
}