bulk/job.go (489 lines of code) (raw):

package bulk import ( "bufio" "bytes" "encoding/json" "errors" "fmt" "io" "net/http" "strconv" "strings" sfdc "github.com/elastic/go-sfdc" "github.com/elastic/go-sfdc/session" ) // JobType is the bulk job type. type JobType string const ( // BigObjects is the big objects job. BigObjects JobType = "BigObjectIngest" // Classic is the bulk job 1.0. Classic JobType = "Classic" // V2Ingest is the bulk job 2.0. V2Ingest JobType = "V2Ingest" ) // ColumnDelimiter is the column delimiter used for CSV job data. type ColumnDelimiter string const ( // Backquote is the (`) character. Backquote ColumnDelimiter = "BACKQUOTE" // Caret is the (^) character. Caret ColumnDelimiter = "CARET" // Comma is the (,) character. Comma ColumnDelimiter = "COMMA" // Pipe is the (|) character. Pipe ColumnDelimiter = "PIPE" // SemiColon is the (;) character. SemiColon ColumnDelimiter = "SEMICOLON" // Tab is the (\t) character. Tab ColumnDelimiter = "TAB" ) // ContentType is the format of the data being processed. type ContentType string // CSV is the supported content data type. const CSV ContentType = "CSV" // LineEnding is the line ending used for the CSV job data. type LineEnding string const ( // Linefeed is the (\n) character. Linefeed LineEnding = "LF" // CarriageReturnLinefeed is the (\r\n) character. CarriageReturnLinefeed LineEnding = "CRLF" ) // Operation is the processing operation for the job. type Operation string const ( // Insert is the object operation for inserting records. Insert Operation = "insert" // Delete is the object operation for deleting records. Delete Operation = "delete" // Update is the object operation for updating records. Update Operation = "update" // Upsert is the object operation for upserting records. Upsert Operation = "upsert" ) // State is the current state of processing for the job. type State string const ( // Open the job has been created and job data can be uploaded tothe job. Open State = "Open" // UpdateComplete all data for the job has been uploaded and the job is ready to be queued and processed. UpdateComplete State = "UploadComplete" // Aborted the job has been aborted. Aborted State = "Aborted" // JobComplete the job was processed by Salesforce. JobComplete State = "JobComplete" // Failed some records in the job failed. Failed State = "Failed" ) // UnprocessedRecord is the unprocessed records from the job. type UnprocessedRecord struct { Fields map[string]string } // JobRecord is the record for the job. Includes the Salesforce ID along with the fields. type JobRecord struct { ID string UnprocessedRecord } // SuccessfulRecord indicates for the record was created and the data that was uploaded. type SuccessfulRecord struct { Created bool JobRecord } // FailedRecord indicates why the record failed and the data of the record. type FailedRecord struct { Error string JobRecord } // Options are the options for the job. // // ColumnDelimiter is the delimiter used for the CSV job. This field is optional. // // ContentType is the content type for the job. This field is optional. // // ExternalIDFieldName is the external ID field in the object being updated. Only needed for // upsert operations. This field is required for upsert operations. // // LineEnding is the line ending used for the CSV job data. This field is optional. // // Object is the object type for the data bneing processed. This field is required. // // Operation is the processing operation for the job. This field is required. type Options struct { ColumnDelimiter ColumnDelimiter `json:"columnDelimiter"` ContentType ContentType `json:"contentType"` ExternalIDFieldName string `json:"externalIdFieldName"` LineEnding LineEnding `json:"lineEnding"` Object string `json:"object"` Operation Operation `json:"operation"` } // Response is the response to job APIs. type Response struct { APIVersion float32 `json:"apiVersion"` ColumnDelimiter string `json:"columnDelimiter"` ConcurrencyMode string `json:"concurrencyMode"` ContentType string `json:"contentType"` ContentURL string `json:"contentUrl"` CreatedByID string `json:"createdById"` CreatedDate string `json:"createdDate"` ExternalIDFieldName string `json:"externalIdFieldName"` ID string `json:"id"` JobType string `json:"jobType"` LineEnding string `json:"lineEnding"` Object string `json:"object"` Operation string `json:"operation"` State string `json:"state"` SystemModstamp string `json:"systemModstamp"` } // Info is the response to the job information API. type Info struct { Response ApexProcessingTime int `json:"apexProcessingTime"` APIActiveProcessingTime int `json:"apiActiveProcessingTime"` NumberRecordsFailed int `json:"numberRecordsFailed"` NumberRecordsProcessed int `json:"numberRecordsProcessed"` Retries int `json:"retries"` TotalProcessingTime int `json:"totalProcessingTime"` ErrorMessage string `json:"errorMessage"` } // Job is the bulk job. type Job struct { session session.ServiceFormatter info Response } func (j *Job) create(options Options) error { err := j.formatOptions(&options) if err != nil { return err } j.info, err = j.createCallout(options) if err != nil { return err } return nil } func (j *Job) formatOptions(options *Options) error { if options.Operation == "" { return errors.New("bulk job: operation is required") } if options.Operation == Upsert { if options.ExternalIDFieldName == "" { return errors.New("bulk job: external id field name is required for upsert operation") } } if options.Object == "" { return errors.New("bulk job: object is required") } if options.LineEnding == "" { options.LineEnding = Linefeed } if options.ContentType == "" { options.ContentType = CSV } if options.ColumnDelimiter == "" { options.ColumnDelimiter = Comma } return nil } func (j *Job) createCallout(options Options) (Response, error) { url := j.session.ServiceURL() + bulk2Endpoint body, err := json.Marshal(options) if err != nil { return Response{}, err } request, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body)) if err != nil { return Response{}, err } request.Header.Add("Accept", "application/json") request.Header.Add("Content-Type", "application/json") j.session.AuthorizationHeader(request) return j.response(request) } func (j *Job) response(request *http.Request) (Response, error) { response, err := j.session.Client().Do(request) if err != nil { return Response{}, err } decoder := json.NewDecoder(response.Body) defer response.Body.Close() if response.StatusCode != http.StatusOK { var errs []sfdc.Error err = decoder.Decode(&errs) var errMsg error if err == nil { for _, err := range errs { errMsg = fmt.Errorf("insert response err: %s: %s", err.ErrorCode, err.Message) } } else { errMsg = fmt.Errorf("insert response err: %d %s", response.StatusCode, response.Status) } return Response{}, errMsg } var value Response err = decoder.Decode(&value) if err != nil { return Response{}, err } return value, nil } // Info returns the current job information. func (j *Job) Info() (Info, error) { url := j.session.ServiceURL() + bulk2Endpoint + "/" + j.info.ID request, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return Info{}, err } request.Header.Add("Accept", "application/json") request.Header.Add("Content-Type", "application/json") j.session.AuthorizationHeader(request) return j.infoResponse(request) } func (j *Job) infoResponse(request *http.Request) (Info, error) { response, err := j.session.Client().Do(request) if err != nil { return Info{}, err } decoder := json.NewDecoder(response.Body) defer response.Body.Close() if response.StatusCode != http.StatusOK { var errs []sfdc.Error err = decoder.Decode(&errs) var errMsg error if err == nil { for _, err := range errs { errMsg = fmt.Errorf("job err: %s: %s", err.ErrorCode, err.Message) } } else { errMsg = fmt.Errorf("job err: %d %s", response.StatusCode, response.Status) } return Info{}, errMsg } var value Info err = decoder.Decode(&value) if err != nil { return Info{}, err } return value, nil } func (j *Job) setState(state State) (Response, error) { url := j.session.ServiceURL() + bulk2Endpoint + "/" + j.info.ID jobState := struct { State string `json:"state"` }{ State: string(state), } body, err := json.Marshal(jobState) if err != nil { return Response{}, err } request, err := http.NewRequest(http.MethodPatch, url, bytes.NewReader(body)) if err != nil { return Response{}, err } request.Header.Add("Accept", "application/json") request.Header.Add("Content-Type", "application/json") j.session.AuthorizationHeader(request) return j.response(request) } // Close will close the current job. func (j *Job) Close() (Response, error) { return j.setState(UpdateComplete) } // Abort will abort the current job. func (j *Job) Abort() (Response, error) { return j.setState(Aborted) } // Delete will delete the current job. func (j *Job) Delete() error { url := j.session.ServiceURL() + bulk2Endpoint + "/" + j.info.ID request, err := http.NewRequest(http.MethodDelete, url, nil) if err != nil { return err } j.session.AuthorizationHeader(request) response, err := j.session.Client().Do(request) if err != nil { return err } if response.StatusCode != http.StatusNoContent { return errors.New("job error: unable to delete job") } return nil } // Upload will upload data to processing. func (j *Job) Upload(body io.Reader) error { url := j.session.ServiceURL() + bulk2Endpoint + "/" + j.info.ID + "/batches" request, err := http.NewRequest(http.MethodPut, url, body) if err != nil { return err } request.Header.Add("Content-Type", "text/csv") j.session.AuthorizationHeader(request) response, err := j.session.Client().Do(request) if err != nil { return err } if response.StatusCode != http.StatusCreated { return errors.New("job error: unable to upload job") } return nil } // SuccessfulRecords returns the successful records for the job. func (j *Job) SuccessfulRecords() ([]SuccessfulRecord, error) { url := j.session.ServiceURL() + bulk2Endpoint + "/" + j.info.ID + "/successfulResults/" request, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return nil, err } request.Header.Add("Accept", "text/csv") j.session.AuthorizationHeader(request) response, err := j.session.Client().Do(request) if err != nil { return nil, err } if response.StatusCode != http.StatusOK { decoder := json.NewDecoder(response.Body) defer response.Body.Close() var errs []sfdc.Error err = decoder.Decode(&errs) var errMsg error if err == nil { for _, err := range errs { errMsg = fmt.Errorf("job err: %s: %s", err.ErrorCode, err.Message) } } else { errMsg = fmt.Errorf("job err: %d %s", response.StatusCode, response.Status) } return nil, errMsg } scanner := bufio.NewScanner(response.Body) defer response.Body.Close() scanner.Split(bufio.ScanLines) var records []SuccessfulRecord delimiter := j.delimiter() columns, err := j.recordResultHeader(scanner, delimiter) if err != nil { return nil, err } createIdx, err := j.headerPosition(`sf__Created`, columns) if err != nil { return nil, err } idIdx, err := j.headerPosition(`sf__Id`, columns) if err != nil { return nil, err } fields := j.fields(columns, 2) for scanner.Scan() { var record SuccessfulRecord values := strings.Split(scanner.Text(), delimiter) isCreated := strings.Replace(values[createIdx], "\"", "", -1) created, err := strconv.ParseBool(isCreated) if err != nil { return nil, err } record.Created = created record.ID = values[idIdx] record.Fields = j.record(fields, values[2:]) records = append(records, record) } return records, nil } // FailedRecords returns the failed records for the job. func (j *Job) FailedRecords() ([]FailedRecord, error) { url := j.session.ServiceURL() + bulk2Endpoint + "/" + j.info.ID + "/failedResults/" request, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return nil, err } request.Header.Add("Accept", "text/csv") j.session.AuthorizationHeader(request) response, err := j.session.Client().Do(request) if err != nil { return nil, err } if response.StatusCode != http.StatusOK { decoder := json.NewDecoder(response.Body) defer response.Body.Close() var errs []sfdc.Error err = decoder.Decode(&errs) var errMsg error if err == nil { for _, err := range errs { errMsg = fmt.Errorf("job err: %s: %s", err.ErrorCode, err.Message) } } else { errMsg = fmt.Errorf("job err: %d %s", response.StatusCode, response.Status) } return nil, errMsg } scanner := bufio.NewScanner(response.Body) defer response.Body.Close() scanner.Split(bufio.ScanLines) var records []FailedRecord delimiter := j.delimiter() columns, err := j.recordResultHeader(scanner, delimiter) if err != nil { return nil, err } errorIdx, err := j.headerPosition(`sf__Error`, columns) if err != nil { return nil, err } idIdx, err := j.headerPosition(`sf__Id`, columns) if err != nil { return nil, err } fields := j.fields(columns, 2) for scanner.Scan() { var record FailedRecord values := strings.Split(scanner.Text(), delimiter) record.Error = values[errorIdx] record.ID = values[idIdx] record.Fields = j.record(fields, values[2:]) records = append(records, record) } return records, nil } // UnprocessedRecords returns the unprocessed records for the job. func (j *Job) UnprocessedRecords() ([]UnprocessedRecord, error) { url := j.session.ServiceURL() + bulk2Endpoint + "/" + j.info.ID + "/unprocessedrecords/" request, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { return nil, err } request.Header.Add("Accept", "text/csv") j.session.AuthorizationHeader(request) response, err := j.session.Client().Do(request) if err != nil { return nil, err } if response.StatusCode != http.StatusOK { decoder := json.NewDecoder(response.Body) defer response.Body.Close() var errs []sfdc.Error err = decoder.Decode(&errs) var errMsg error if err == nil { for _, err := range errs { errMsg = fmt.Errorf("job err: %s: %s", err.ErrorCode, err.Message) } } else { errMsg = fmt.Errorf("job err: %d %s", response.StatusCode, response.Status) } return nil, errMsg } scanner := bufio.NewScanner(response.Body) defer response.Body.Close() scanner.Split(bufio.ScanLines) var records []UnprocessedRecord delimiter := j.delimiter() columns, err := j.recordResultHeader(scanner, delimiter) if err != nil { return nil, err } fields := j.fields(columns, 0) for scanner.Scan() { var record UnprocessedRecord values := strings.Split(scanner.Text(), delimiter) record.Fields = j.record(fields, values) records = append(records, record) } return records, nil } func (j *Job) recordResultHeader(scanner *bufio.Scanner, delimiter string) ([]string, error) { if scanner.Scan() == false { return nil, errors.New("job: response needs to have header") } text := strings.Replace(scanner.Text(), "\"", "", -1) return strings.Split(text, delimiter), nil } func (j *Job) headerPosition(column string, header []string) (int, error) { for idx, col := range header { if col == column { return idx, nil } } return -1, fmt.Errorf("job header: %s column is not in header", column) } func (j *Job) fields(header []string, offset int) []string { fields := make([]string, len(header)-offset) copy(fields[:], header[offset:]) return fields } func (j *Job) record(fields, values []string) map[string]string { record := make(map[string]string) for idx, field := range fields { record[field] = values[idx] } return record } func (j *Job) delimiter() string { switch ColumnDelimiter(j.info.ColumnDelimiter) { case Tab: return "\t" case SemiColon: return ";" case Pipe: return "|" case Caret: return "^" case Backquote: return "`" default: return "," } } func (j *Job) newline() string { switch LineEnding(j.info.LineEnding) { case CarriageReturnLinefeed: return "\r\n" default: return "\n" } }