loader/stream_loader.go (348 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package loader import ( "doris-streamloader/report" "encoding/json" "fmt" "io" "io/ioutil" "net/http" "strconv" "strings" "sync" "sync/atomic" "time" "unicode/utf8" "github.com/pierrec/lz4/v4" log "github.com/sirupsen/logrus" ) type StreamLoadOption struct { Compress bool CheckUTF8 bool Timeout int } type StreamLoad struct { url string dbName string tableName string userName string password string headers map[string]string queues []chan []byte pool *sync.Pool wg sync.WaitGroup report *report.Reporter loadResp *Resp StreamLoadOption } // Stream load response type StreamLoadResp struct { TxnID int `json:"TxnId"` Label string `json:"Label"` Status string `json:"Status"` Message string `json:"Message"` NumberTotalRows int `json:"NumberTotalRows"` NumberLoadedRows int `json:"NumberLoadedRows"` NumberFilteredRows int `json:"NumberFilteredRows"` NumberUnselectedRows int `json:"NumberUnselectedRows"` LoadBytes int `json:"LoadBytes"` LoadTimeMs int `json:"LoadTimeMs"` BeginTxnTimeMs int `json:"BeginTxnTimeMs"` StreamLoadPutTimeMs int `json:"StreamLoadPutTimeMs"` ReadDataTimeMs int `json:"ReadDataTimeMs"` WriteDataTimeMs int `json:"WriteDataTimeMs"` CommitAndPublishTimeMs int `json:"CommitAndPublishTimeMs"` ErrorURL string `json:"ErrorURL"` } type Resp struct { Status string TotalRows uint64 FailLoadRows uint64 LoadedRows uint64 FilteredRows uint64 UnselectedRows uint64 LoadBytes uint64 LoadTimeMs int64 LoadFiles []string } type ReadOption struct { maxBytesPerTask int workerIndex int taskIndex int } type LoadInfo struct { SourceFilePaths string Url string DbName string TableName string UserName string Password string Compress bool Headers map[string]string Timeout int BatchRows int BatchBytes int MaxBytesPerTask int Debug bool Workers int DiskThroughput int StreamLoadThroughput int CheckUTF8 bool ReportDuration int NeedRetry bool RetryTimes int RetryInterval int } // create stream load from flags func NewStreamLoad(url, dbName, tableName, userName, password string, headers map[string]string, queues []chan []byte, pool *sync.Pool, option StreamLoadOption, report *report.Reporter, loadResp *Resp) *StreamLoad { return &StreamLoad{ url: url, dbName: dbName, tableName: tableName, userName: userName, password: password, headers: headers, queues: queues, pool: pool, wg: sync.WaitGroup{}, report: report, loadResp: loadResp, StreamLoadOption: option, } } // stream load create url func (s *StreamLoad) createUrl() string { return fmt.Sprintf("%s/api/%s/%s/_stream_load", s.url, s.dbName, s.tableName) } // stream load create http request with string data func (s *StreamLoad) createRequest(url string, reader io.Reader, workerIndex int, taskIndex int) (req *http.Request, err error) { req, err = http.NewRequest("PUT", url, reader) if err != nil { return } // set auth req.SetBasicAuth(s.userName, s.password) req.Header.Set("Expect", "100-continue") req.Header.Set("Content-Type", "text/plain") for k, v := range s.headers { req.Header.Set(k, v) // If a label has already been set in the headers, to prevent conflicts, //generate a unique label by combining the original label, worker index, and task index. if k == "label" { var builder strings.Builder builder.WriteString(v) builder.WriteString("_") builder.WriteString(strconv.Itoa(workerIndex)) builder.WriteString("_") builder.WriteString(strconv.Itoa(taskIndex)) req.Header.Set("label", builder.String()) } } if s.Compress { req.Header.Set("Content-Encoding", "lz4") req.Header.Set("compress_type", "LZ4") } return } // read data from queue, write data to pipe pw func (s *StreamLoad) readData(isEOS *atomic.Bool, rawWriter *io.PipeWriter, readOption *ReadOption) { defer rawWriter.Close() var writer io.Writer = rawWriter if s.Compress { // cWriter := gzip.NewWriter(rawWriter) cWriter := lz4.NewWriter(rawWriter) defer cWriter.Close() writer = cWriter } max_bytes_rows := readOption.maxBytesPerTask for { if max_bytes_rows <= 0 { break } // get data data, ok := <-s.queues[readOption.workerIndex] if !ok { isEOS.Store(true) break } max_bytes_rows -= len(data) // write all data to writer // s := string(data) // println(s) // println("xxxxxxxx") // if _, err := writer.Write([]byte(s)); err != nil { // panic(err) // } // var a string // TODO(Drogon): delete or valid var validUTF8Buffer []byte if s.CheckUTF8 { if utf8.Valid(data) { validUTF8Buffer = data } else { validUTF8Buffer = s.toValidUTF8(data) defer s.pool.Put(validUTF8Buffer[:0]) log.Error("The byte slice is not valid UTF-8 encoding") } } else { validUTF8Buffer = data } // b := []byte(string([]rune(string(data)))) // b := []byte(string(data)) // strings.ToValidUTF8("a\xc5z", "") if _, err := writer.Write(validUTF8Buffer); err != nil { s.handleSendError(readOption.workerIndex, readOption.taskIndex) log.Errorf("send error: %v", err) return } s.pool.Put(data[:0]) } } func (s *StreamLoad) send(url string, reader io.Reader, workerIndex int, taskIndex int) (*http.Response, error) { realUrl := url for { req, err := s.createRequest(realUrl, reader, workerIndex, taskIndex) if err != nil { if req == nil { return nil, err } else { return req.Response, err } } // create client client := &http.Client{ // max retry 10 times CheckRedirect: func(req *http.Request, via []*http.Request) error { return fmt.Errorf("redirect") }, Timeout: time.Duration(s.Timeout) * time.Second, } // send request resp, err := client.Do(req) if err != nil { return resp, err } defer resp.Body.Close() // read response body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Errorf("Read stream load response failed, response body: %s, error message: %v, it is hard to judge whether load success or not, we suggest:\n1.Do select count(*) to check whether data is partially loaded.\n2.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.", body, err) return resp, nil } if resp.StatusCode == 307 { realUrl = resp.Header.Get("Location") if realUrl == "" { log.Errorf("Send error, redirectUrl is empty, error message: %v", err) return resp, nil } log.Infof("redirect to %s", realUrl) continue } // check response if resp.StatusCode != 200 { // print response headers log.Debugf("response headers: %v", resp.Header) return resp, fmt.Errorf("response code is not 200, response code: %d, response body: %s", resp.StatusCode, body) } // parse response var respMsg StreamLoadResp err = json.Unmarshal(body, &respMsg) if err != nil { log.Errorf("Parse stream load response failed, response body: %s, error message: %v, it is hard to judge whether load success or not, we suggest:\n1.Do select count(*) to check whether data is partially loaded\n2.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n", body, err) return resp, nil } if respMsg.Status != "Success" && respMsg.Status != "Publish Timeout" { return resp, fmt.Errorf("stream load failed, response %s", body) } log.Infof("resp: %s", body) // update resp atomic.AddUint64(&s.loadResp.LoadedRows, uint64(respMsg.NumberLoadedRows)) atomic.AddUint64(&s.loadResp.FilteredRows, uint64(respMsg.NumberFilteredRows)) atomic.AddUint64(&s.loadResp.UnselectedRows, uint64(respMsg.NumberUnselectedRows)) atomic.AddUint64(&s.loadResp.LoadBytes, uint64(respMsg.LoadBytes)) return resp, nil } } // convert []byte to utf-8 valid [] byte, if not valid, replace invalid char with U+FFFD func (s *StreamLoad) toValidUTF8(b []byte) []byte { invalid := []byte("\uFFFD") buf := s.pool.Get().([]byte) for len(b) > 0 { r, size := utf8.DecodeRune(b) if r == utf8.RuneError && size == 1 { buf = append(buf, invalid...) } else { buf = append(buf, b[:size]...) } b = b[size:] } return buf } func (s *StreamLoad) handleSendError(workerIndex int, taskIndex int) { s.report.Lock.Lock() s.report.FinishedWorkers += 1 s.report.FailedWorkers[workerIndex] = taskIndex s.report.Lock.Unlock() } // StreamLoad executeGetAndSend: get data from queue and send to server func (s *StreamLoad) executeGetAndSend(maxRowsPerTask int, maxBytesPerTask int, workerIndex int) { defer s.wg.Done() defer log.Info("execute worker exit") taskIndex := 1 for { url := s.createUrl() var isEOS atomic.Bool isEOS.Store(false) pr, pw := io.Pipe() go s.readData(&isEOS, pw, &ReadOption{ maxBytesPerTask: maxBytesPerTask, workerIndex: workerIndex, taskIndex: taskIndex, }) if resp, err := s.send(url, NopCloser(pr), workerIndex, taskIndex); err != nil { s.handleSendError(workerIndex, taskIndex) log.Errorf("Send error, resp: %v error message: %v", resp, err) return } else { log.Debugf("send success resp: %v", resp) } if isEOS.Load() { break } taskIndex++ } atomic.AddUint64(&s.report.TotalWorkers, 1) } func (s *StreamLoad) ExecuteGetAndSend(maxRowsPerTask int, maxBytesPerTask int, workerIndex int) { s.wg.Add(1) go s.executeGetAndSend(maxRowsPerTask, maxBytesPerTask, workerIndex) } func (s *StreamLoad) Load(workers int, maxRowsPerTask int, maxBytesPerTask int, retryInfo *map[int]int) { if len(*retryInfo) > 0 { for workerIndex := range *retryInfo { s.ExecuteGetAndSend(maxRowsPerTask, maxBytesPerTask, workerIndex) } } else { for i := 0; i < workers; i++ { s.ExecuteGetAndSend(maxRowsPerTask, maxBytesPerTask, i) } } } // Wait func (s *StreamLoad) Wait(loadInfo *LoadInfo, retryCount int, retryInfo *map[int]int, startTime time.Time) { s.wg.Wait() if len(s.report.FailedWorkers) == 0 { s.showLoadResult(true, startTime) loadInfo.NeedRetry = false return } // parse header var headerStrings []string for key, value := range loadInfo.Headers { headerStrings = append(headerStrings, fmt.Sprintf("%s:%s", key, value)) } headers := strings.Join(headerStrings, "?") if retryCount < loadInfo.RetryTimes-1 { log.Infof("\nAuto retrying, retry count %d waiting time %ds...\n", retryCount+1, loadInfo.RetryInterval) *retryInfo = make(map[int]int) for workerIndex, taskIndex := range s.report.FailedWorkers { (*retryInfo)[workerIndex] = taskIndex } s.report.FailedWorkers = make(map[int]int) s.loadResp.LoadFiles = []string{} return } //parse failed worker var FailedWorkersStrings []string for key, value := range s.report.FailedWorkers { FailedWorkersStrings = append(FailedWorkersStrings, fmt.Sprintf("%d,%d", key, value)) } failed := strings.Join(FailedWorkersStrings, ";") // retry command command := fmt.Sprintf("./doris_streamloader --source_file %s --url=\"%s\" --header=\"%s\" --db=\"%s\" --table=\"%s\" -u %s -p \"%s\" --compress=%t --timeout=%d --workers=%d --disk_throughput=%d --streamload_throughput=%d --batch=%d --batch_byte=%d --max_byte_per_task=%d --check_utf8=%t --report_duration=%d --auto_retry=\"%s\" --auto_retry_times=%d --auto_retry_interval=%d", loadInfo.SourceFilePaths, loadInfo.Url, headers, loadInfo.DbName, loadInfo.TableName, loadInfo.UserName, loadInfo.Password, loadInfo.Compress, loadInfo.Timeout, loadInfo.Workers, loadInfo.DiskThroughput, loadInfo.StreamLoadThroughput, loadInfo.BatchRows, loadInfo.BatchBytes, loadInfo.MaxBytesPerTask, loadInfo.CheckUTF8, loadInfo.ReportDuration, failed, loadInfo.RetryTimes, loadInfo.RetryInterval) fmt.Printf("load has some error, and auto retry failed, you can retry by : \n%s\n", command) s.showLoadResult(false, startTime) loadInfo.NeedRetry = false } func (s *StreamLoad) showLoadResult(isSuccessful bool, startTime time.Time) { endTime := time.Now() elapsed := endTime.Sub(startTime) s.loadResp.LoadTimeMs = int64(elapsed.Milliseconds()) s.loadResp.FailLoadRows = s.loadResp.TotalRows - s.loadResp.LoadedRows - s.loadResp.FilteredRows - s.loadResp.UnselectedRows result := "success" if !isSuccessful { s.loadResp.Status = "Failed" result = "fail" } jsonData, err := json.MarshalIndent(s.loadResp, "", "\t") if err != nil { log.Errorf("Load %s, but parse load result to json error: %v, can not show load result", result, err) return } fmt.Printf("Load Result: %s\n", jsonData) }