sobject/query.go (251 lines of code) (raw):
package sobject
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/elastic/go-sfdc"
"github.com/elastic/go-sfdc/session"
)
// Querier is the interface used to query a SObject from
// Salesforce.
//
// SObject is the table object in Salesforce, like Account.
//
// ID is the Salesforce ID of the table object to retrieve.
//
// Fields is the fields to be returned. If the field array
// is empty, the all of the fields will be returned.
type Querier interface {
SObject() string
ID() string
Fields() []string
}
// ExternalQuerier is the interface used to query a SObject from
// Salesforce using an external ID.
//
// SObject is the table object in Salesforce, like Account.
//
// ID is the external ID of the table object to retrieve.
//
// Fields is the fields to be returned. If the field array
// is empty, the all of the fields will be returned.
//
// ExternalField is the external field on the sobject.
type ExternalQuerier interface {
Querier
ExternalField() string
}
type deletedRecord struct {
ID string `json:"id"`
DeletedDateStr string `json:"deletedDate"`
DeletedDate time.Time `json:"-"`
}
// DeletedRecords is the return structure listing the deleted records.
type DeletedRecords struct {
Records []deletedRecord `json:"deletedRecords"`
EarliestDateStr string `json:"earliestDateAvailable"`
LatestDateStr string `json:"latestDateCovered"`
EarliestDate time.Time `json:"-"`
LatestDate time.Time `json:"-"`
}
// UpdatedRecords is the return structure listing the updated records.
type UpdatedRecords struct {
Records []string `json:"ids"`
LatestDateStr string `json:"latestDateCovered"`
LatestDate time.Time `json:"-"`
}
// ContentType is indicator of the content type in Salesforce blob.
type ContentType string
const (
// AttachmentType is the content blob from the Salesforce Attachment record.
AttachmentType ContentType = "Attachment"
// DocumentType is the content blob from the Salesforce Document record.
DocumentType ContentType = "Document"
)
const deletedRoute = "deleted"
const updatedRoute = "updated"
const contentBody = "body"
type query struct {
session session.ServiceFormatter
}
func (q *query) callout(querier Querier) (*sfdc.Record, error) {
request, err := q.queryRequest(querier)
if err != nil {
return nil, err
}
value, err := q.queryResponse(request)
if err != nil {
return nil, err
}
return value, nil
}
func (q *query) queryRequest(querier Querier) (*http.Request, error) {
queryURL := q.session.ServiceURL() + objectEndpoint + querier.SObject() + "/" + querier.ID()
if len(querier.Fields()) > 0 {
fields := strings.Join(querier.Fields(), ",")
form := url.Values{}
form.Add("fields", fields)
queryURL += "?" + form.Encode()
}
request, err := http.NewRequest(http.MethodGet, queryURL, nil)
if err != nil {
return nil, err
}
request.Header.Add("Accept", "application/json")
q.session.AuthorizationHeader(request)
return request, nil
}
func (q *query) queryResponse(request *http.Request) (*sfdc.Record, error) {
response, err := q.session.Client().Do(request)
if err != nil {
return nil, err
}
decoder := json.NewDecoder(response.Body)
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
var queryErrs []sfdc.Error
err = decoder.Decode(&queryErrs)
var errMsg error
if err == nil {
for _, queryErr := range queryErrs {
errMsg = fmt.Errorf("insert response err: %s: %s", queryErr.ErrorCode, queryErr.Message)
}
} else {
errMsg = fmt.Errorf("insert response err: %d %s", response.StatusCode, response.Status)
}
return nil, errMsg
}
var record sfdc.Record
err = decoder.Decode(&record)
if err != nil {
return nil, err
}
return &record, nil
}
func (q *query) externalCallout(querier ExternalQuerier) (*sfdc.Record, error) {
request, err := q.externalQueryRequest(querier)
if err != nil {
return nil, err
}
value, err := q.queryResponse(request)
if err != nil {
return nil, err
}
return value, nil
}
func (q *query) externalQueryRequest(querier ExternalQuerier) (*http.Request, error) {
queryURL := q.session.ServiceURL() + objectEndpoint + querier.SObject() + "/" + querier.ExternalField() + "/" + querier.ID()
if len(querier.Fields()) > 0 {
fields := strings.Join(querier.Fields(), ",")
form := url.Values{}
form.Add("fields", fields)
queryURL += "?" + form.Encode()
}
request, err := http.NewRequest(http.MethodGet, queryURL, nil)
if err != nil {
return nil, err
}
request.Header.Add("Accept", "application/json")
q.session.AuthorizationHeader(request)
return request, nil
}
func (q *query) deletedRecordsCallout(sobject string, startDate, endDate time.Time) (DeletedRecords, error) {
request, err := q.operationRequest(sobject, deletedRoute, startDate, endDate)
if err != nil {
return DeletedRecords{}, err
}
value, err := q.deletedRecordsResponse(request)
if err != nil {
return DeletedRecords{}, err
}
return value, nil
}
func (q *query) deletedRecordsResponse(request *http.Request) (DeletedRecords, error) {
response, err := q.session.Client().Do(request)
if err != nil {
return DeletedRecords{}, err
}
decoder := json.NewDecoder(response.Body)
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return DeletedRecords{}, fmt.Errorf("deleted records response err: %d %s", response.StatusCode, response.Status)
}
var records DeletedRecords
err = decoder.Decode(&records)
if err != nil {
return DeletedRecords{}, err
}
for idx, record := range records.Records {
date, err := sfdc.ParseTime(record.DeletedDateStr)
if err != nil {
return DeletedRecords{}, err
}
records.Records[idx].DeletedDate = date
}
var date time.Time
date, err = sfdc.ParseTime(records.EarliestDateStr)
if err != nil {
return DeletedRecords{}, err
}
records.EarliestDate = date
date, err = sfdc.ParseTime(records.LatestDateStr)
if err != nil {
return DeletedRecords{}, err
}
records.LatestDate = date
return records, nil
}
func (q *query) updatedRecordsCallout(sobject string, startDate, endDate time.Time) (UpdatedRecords, error) {
request, err := q.operationRequest(sobject, updatedRoute, startDate, endDate)
if err != nil {
return UpdatedRecords{}, err
}
value, err := q.updatedRecordsResponse(request)
if err != nil {
return UpdatedRecords{}, err
}
return value, nil
}
func (q *query) updatedRecordsResponse(request *http.Request) (UpdatedRecords, error) {
response, err := q.session.Client().Do(request)
if err != nil {
return UpdatedRecords{}, err
}
decoder := json.NewDecoder(response.Body)
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return UpdatedRecords{}, fmt.Errorf("deleted records response err: %d %s", response.StatusCode, response.Status)
}
var records UpdatedRecords
err = decoder.Decode(&records)
if err != nil {
return UpdatedRecords{}, err
}
date, err := sfdc.ParseTime(records.LatestDateStr)
if err != nil {
return UpdatedRecords{}, err
}
records.LatestDate = date
return records, nil
}
func (q *query) operationRequest(sobject, operation string, startDate, endDate time.Time) (*http.Request, error) {
form := url.Values{}
form.Add("start", startDate.Format(time.RFC3339))
form.Add("end", endDate.Format(time.RFC3339))
dateRange := "?" + form.Encode()
queryURL := q.session.ServiceURL() + objectEndpoint + sobject + "/" + operation + "/" + dateRange
request, err := http.NewRequest(http.MethodGet, queryURL, nil)
if err != nil {
return nil, err
}
request.Header.Add("Accept", "application/json")
q.session.AuthorizationHeader(request)
return request, nil
}
func (q *query) contentCallout(id string, content ContentType) ([]byte, error) {
request, err := q.contentRequest(id, content)
if err != nil {
return nil, err
}
return q.contentResponse(request)
}
func (q *query) contentRequest(id string, content ContentType) (*http.Request, error) {
queryURL := q.session.ServiceURL() + objectEndpoint + string(content) + "/" + id + "/" + contentBody
request, err := http.NewRequest(http.MethodGet, queryURL, nil)
if err != nil {
return nil, err
}
q.session.AuthorizationHeader(request)
return request, nil
}
func (q *query) contentResponse(request *http.Request) ([]byte, error) {
response, err := q.session.Client().Do(request)
if err != nil {
return nil, err
}
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("deleted records response err: %d %s", response.StatusCode, response.Status)
}
body, err := ioutil.ReadAll(response.Body)
defer response.Body.Close()
return body, err
}