bulk/jobs.go (102 lines of code) (raw):

package bulk import ( "encoding/json" "errors" "fmt" "net/http" "strconv" sfdc "github.com/elastic/go-sfdc" "github.com/elastic/go-sfdc/session" ) // Parameters to query all of the bulk jobs. // // IsPkChunkingEnabled will filter jobs with PK chunking enabled. // // JobType will filter jobs based on job type. type Parameters struct { IsPkChunkingEnabled bool JobType JobType } type jobResponse struct { Done bool `json:"done"` Records []Response `json:"records"` NextRecordsURL string `json:"nextRecordsUrl"` } // Jobs presents the response from the all jobs request. type Jobs struct { session session.ServiceFormatter response jobResponse } func newJobs(session session.ServiceFormatter, parameters Parameters) (*Jobs, error) { j := &Jobs{ session: session, } url := session.ServiceURL() + bulk2Endpoint request, err := j.request(url) if err != nil { return nil, err } q := request.URL.Query() q.Add("isPkChunkingEnabled", strconv.FormatBool(parameters.IsPkChunkingEnabled)) q.Add("jobType", string(parameters.JobType)) request.URL.RawQuery = q.Encode() response, err := j.do(request) if err != nil { return nil, err } j.response = response return j, nil } // Done indicates whether there are more jobs to get. func (j *Jobs) Done() bool { return j.response.Done } // Records contains the information for each retrieved job. func (j *Jobs) Records() []Response { return j.response.Records } // Next will retrieve the next batch of job information. func (j *Jobs) Next() (*Jobs, error) { if j.Done() == true { return nil, errors.New("jobs: there is no more records") } request, err := j.request(j.response.NextRecordsURL) if err != nil { return nil, err } response, err := j.do(request) if err != nil { return nil, err } return &Jobs{ session: j.session, response: response, }, nil } func (j *Jobs) request(url string) (*http.Request, error) { request, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return nil, err } request.Header.Add("Accept", "application/json") j.session.AuthorizationHeader(request) return request, nil } func (j *Jobs) do(request *http.Request) (jobResponse, error) { response, err := j.session.Client().Do(request) if err != nil { return jobResponse{}, err } decoder := json.NewDecoder(response.Body) defer response.Body.Close() if response.StatusCode != http.StatusOK { var jobsErrs []sfdc.Error err = decoder.Decode(&jobsErrs) var errMsg error if err == nil { for _, jobErr := range jobsErrs { errMsg = fmt.Errorf("insert response err: %s: %s", jobErr.ErrorCode, jobErr.Message) } } else { errMsg = fmt.Errorf("insert response err: %d %s", response.StatusCode, response.Status) } return jobResponse{}, errMsg } var value jobResponse err = decoder.Decode(&value) if err != nil { return jobResponse{}, err } return value, nil }