in bulk/job.go [510:558]
func (j *Job) UnprocessedRecords() ([]UnprocessedRecord, error) {
url := j.session.ServiceURL() + bulk2Endpoint + "/" + j.info.ID + "/unprocessedrecords/"
request, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
request.Header.Add("Accept", "text/csv")
j.session.AuthorizationHeader(request)
response, err := j.session.Client().Do(request)
if err != nil {
return nil, err
}
if response.StatusCode != http.StatusOK {
decoder := json.NewDecoder(response.Body)
defer response.Body.Close()
var errs []sfdc.Error
err = decoder.Decode(&errs)
var errMsg error
if err == nil {
for _, err := range errs {
errMsg = fmt.Errorf("job err: %s: %s", err.ErrorCode, err.Message)
}
} else {
errMsg = fmt.Errorf("job err: %d %s", response.StatusCode, response.Status)
}
return nil, errMsg
}
scanner := bufio.NewScanner(response.Body)
defer response.Body.Close()
scanner.Split(bufio.ScanLines)
var records []UnprocessedRecord
delimiter := j.delimiter()
columns, err := j.recordResultHeader(scanner, delimiter)
if err != nil {
return nil, err
}
fields := j.fields(columns, 0)
for scanner.Scan() {
var record UnprocessedRecord
values := strings.Split(scanner.Text(), delimiter)
record.Fields = j.record(fields, values)
records = append(records, record)
}
return records, nil
}