func()

in loader/stream_loader.go [245:318]


func (s *StreamLoad) send(url string, reader io.Reader, workerIndex int, taskIndex int) (*http.Response, error) {
	realUrl := url
	for {
		req, err := s.createRequest(realUrl, reader, workerIndex, taskIndex)
		if err != nil {
			if req == nil {
				return nil, err
			} else {
				return req.Response, err
			}
		}

		// create client
		client := &http.Client{
			// max retry 10 times
			CheckRedirect: func(req *http.Request, via []*http.Request) error {
				return fmt.Errorf("redirect")
			},
			Timeout: time.Duration(s.Timeout) * time.Second,
		}

		// send request
		resp, err := client.Do(req)
		if err != nil {
			return resp, err
		}
		defer resp.Body.Close()

		// read response
		body, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			log.Errorf("Read stream load response failed, response body: %s, error message: %v, it is hard to judge whether load success or not, we suggest:\n1.Do select count(*) to check whether data is partially loaded.\n2.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.", body, err)
			return resp, nil
		}

		if resp.StatusCode == 307 {
			realUrl = resp.Header.Get("Location")
			if realUrl == "" {
				log.Errorf("Send error, redirectUrl is empty, error message: %v", err)
				return resp, nil
			}
			log.Infof("redirect to %s", realUrl)
			continue
		}

		// check response
		if resp.StatusCode != 200 {
			// print response headers
			log.Debugf("response headers: %v", resp.Header)
			return resp, fmt.Errorf("response code is not 200, response code: %d, response body: %s", resp.StatusCode, body)
		}

		// parse response
		var respMsg StreamLoadResp
		err = json.Unmarshal(body, &respMsg)
		if err != nil {
			log.Errorf("Parse stream load response failed, response body: %s, error message: %v, it is hard to judge whether load success or not, we suggest:\n1.Do select count(*) to check whether data is partially loaded\n2.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n", body, err)
			return resp, nil
		}
		if respMsg.Status != "Success" && respMsg.Status != "Publish Timeout" {
			return resp, fmt.Errorf("stream load failed, response %s", body)
		}

		log.Infof("resp: %s", body)

		// update resp
		atomic.AddUint64(&s.loadResp.LoadedRows, uint64(respMsg.NumberLoadedRows))
		atomic.AddUint64(&s.loadResp.FilteredRows, uint64(respMsg.NumberFilteredRows))
		atomic.AddUint64(&s.loadResp.UnselectedRows, uint64(respMsg.NumberUnselectedRows))
		atomic.AddUint64(&s.loadResp.LoadBytes, uint64(respMsg.LoadBytes))

		return resp, nil
	}
}