bayeux.go (285 lines of code) (raw):
package bayeux
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"sync"
"time"
)
type MaybeMsg struct {
Err error
Msg TriggerEvent
}
func (e MaybeMsg) Failed() bool { return e.Err != nil }
func (e MaybeMsg) Error() string { return e.Err.Error() }
// TriggerEvent describes an event received from Bayeaux Endpoint
type TriggerEvent struct {
ClientID string `json:"clientId"`
Data struct {
Event struct {
CreatedDate time.Time `json:"createdDate"`
ReplayID int `json:"replayId"`
Type string `json:"type"`
} `json:"event"`
Object json.RawMessage `json:"sobject"`
Payload json.RawMessage `json:"payload"`
} `json:"data,omitempty"`
Channel string `json:"channel"`
Successful bool `json:"successful,omitempty"`
}
// Status is the state of success and subscribed channels
type status struct {
connected bool
clientID string
channels []string
connectCount int
}
func (st *status) connect() {
st.connectCount++
}
func (st *status) disconnect() {
st.connectCount--
}
type BayeuxHandshake []struct {
Ext struct {
Replay bool `json:"replay"`
} `json:"ext"`
MinimumVersion string `json:"minimumVersion"`
ClientID string `json:"clientId"`
SupportedConnectionTypes []string `json:"supportedConnectionTypes"`
Channel string `json:"channel"`
Version string `json:"version"`
Successful bool `json:"successful"`
}
type Subscription struct {
ClientID string `json:"clientId"`
Channel string `json:"channel"`
Subscription string `json:"subscription"`
Successful bool `json:"successful"`
}
type Credentials struct {
AccessToken string `json:"access_token"`
InstanceURL string `json:"instance_url"`
IssuedAt int
ID string
TokenType string `json:"token_type"`
Signature string
}
func (c Credentials) bayeuxUrl() string {
return c.InstanceURL + "/cometd/38.0"
}
type clientIDAndCookies struct {
clientID string
cookies []*http.Cookie
}
type AuthenticationParameters struct {
ClientID string // consumer key from Salesforce (e.g. 3MVG9pRsdbjsbdjfm1I.fz3f7zBuH4xdKCJcM9B5XLgxXh2AFTmQmr8JMn1vsadjsadjjsadakd_C)
ClientSecret string // consumer secret from Salesforce (e.g. E9FE118633BC7SGDADUHUE81F19C1D4529D09CB7231754AD2F2CA668400619)
Username string // Salesforce user email (e.g. salesforce.user@email.com)
Password string // Salesforce password
TokenURL string // Salesforce token endpoint (e.g. https://login.salesforce.com/services/oauth2/token)
}
// Bayeux struct allow for centralized storage of creds, ids, and cookies
type Bayeux struct {
creds Credentials
id clientIDAndCookies
}
var wg sync.WaitGroup
var logger = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile)
var st = status{false, "", []string{}, 0}
// newHTTPRequest is to create requests with context
func (b *Bayeux) newHTTPRequest(ctx context.Context, body string, route string) (*http.Request, error) {
var jsonStr = []byte(body)
req, err := http.NewRequest("POST", route, bytes.NewBuffer(jsonStr))
if err != nil {
return nil, fmt.Errorf("bad Call request: %w", err)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
req = req.WithContext(ctx)
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", b.creds.AccessToken))
// Per Stackexchange comment, passing back cookies is required though undocumented in Salesforce API
// We were unable to get process working without passing cookies back to SF server.
// SF Reference: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/intro_client_specs.htm
for _, cookie := range b.id.cookies {
req.AddCookie(cookie)
}
}
return req, nil
}
// Call is the base function for making bayeux requests
func (b *Bayeux) call(ctx context.Context, body string, route string) (resp *http.Response, e error) {
req, err := b.newHTTPRequest(ctx, body, route)
if err != nil {
return nil, err
}
client := &http.Client{}
resp, err = client.Do(req)
if err == io.EOF {
// Right way to handle EOF?
return nil, fmt.Errorf("bad bayeuxCall io.EOF: %w", err)
} else if err != nil {
return nil, fmt.Errorf("bad unrecoverable call: %w", err)
}
return resp, nil
}
func (b *Bayeux) getClientID(ctx context.Context) error {
handshake := `{"channel": "/meta/handshake", "supportedConnectionTypes": ["long-polling"], "version": "1.0"}`
// Stub out clientIDAndCookies for first bayeuxCall
resp, err := b.call(ctx, handshake, b.creds.bayeuxUrl())
if err != nil {
return fmt.Errorf("cannot get client id: %s", err)
}
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
var h BayeuxHandshake
if err := decoder.Decode(&h); err == io.EOF {
return err
} else if err != nil {
return err
}
creds := clientIDAndCookies{h[0].ClientID, resp.Cookies()}
b.id = creds
return nil
}
// ReplayAll replay for past 24 hrs
const ReplayAll = -2
// ReplayNone start playing events at current moment
const ReplayNone = -1
// Replay accepts the following values
// Value
// -2: replay all events from past 24 hrs
// -1: start at current
// >= 0: start from this event number
type Replay struct {
Value int
}
func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) error {
handshake := fmt.Sprintf(`{
"channel": "/meta/subscribe",
"subscription": "%s",
"clientId": "%s",
"ext": {
"replay": {"%s": "%s"}
}
}`, channel, b.id.clientID, channel, replay)
resp, err := b.call(ctx, handshake, b.creds.bayeuxUrl())
if err != nil {
return fmt.Errorf("cannot subscribe: %w", err)
}
defer resp.Body.Close()
if os.Getenv("DEBUG") != "" {
logger.Printf("Response: %+v", resp)
var b []byte
if resp.Body != nil {
b, _ = ioutil.ReadAll(resp.Body)
}
// Restore the io.ReadCloser to its original state
resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
// Use the content
s := string(b)
logger.Printf("Response Body: %s", s)
}
if resp.StatusCode > 299 {
return fmt.Errorf("received non 2XX response: %w", err)
}
decoder := json.NewDecoder(resp.Body)
var h []Subscription
if err := decoder.Decode(&h); err == io.EOF {
return err
} else if err != nil {
return err
}
sub := &h[0]
st.connected = sub.Successful
st.clientID = sub.ClientID
st.channels = append(st.channels, channel)
st.connect()
if os.Getenv("DEBUG") != "" {
logger.Printf("Established connection(s): %+v", st)
}
return nil
}
func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
var waitMsgs sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
waitMsgs.Wait()
close(out)
st.disconnect()
wg.Done()
}()
for {
select {
case <-ctx.Done():
return
default:
postBody := fmt.Sprintf(`{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "%s"} `, b.id.clientID)
resp, err := b.call(ctx, postBody, b.creds.bayeuxUrl())
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
out <- MaybeMsg{Err: fmt.Errorf("cannot connect to bayeux: %s, trying again", err)}
} else {
if os.Getenv("DEBUG") != "" {
var b []byte
if resp.Body != nil {
b, _ = ioutil.ReadAll(resp.Body)
}
// Restore the io.ReadCloser to its original state
resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
// Use the content
s := string(b)
logger.Printf("Response Body: %s", s)
}
var x []TriggerEvent
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&x); err != nil && err == io.EOF {
out <- MaybeMsg{Err: err}
return
}
for i := range x {
waitMsgs.Add(1)
go func(e TriggerEvent) {
defer waitMsgs.Done()
out <- MaybeMsg{Msg: e}
}(x[i])
}
}
}
}
}()
return out
}
// GetConnectedCount returns count of subcriptions
func GetConnectedCount() int {
return st.connectCount
}
func GetSalesforceCredentials(ap AuthenticationParameters) (creds *Credentials, err error) {
params := url.Values{"grant_type": {"password"},
"client_id": {ap.ClientID},
"client_secret": {ap.ClientSecret},
"username": {ap.Username},
"password": {ap.Password}}
res, err := http.PostForm(ap.TokenURL, params)
if err != nil {
return nil, err
}
decoder := json.NewDecoder(res.Body)
if err := decoder.Decode(&creds); err == io.EOF {
return nil, err
} else if err != nil {
return nil, err
} else if creds.AccessToken == "" {
return nil, fmt.Errorf("unable to fetch access token: %w", err)
}
return creds, nil
}
func (b *Bayeux) Channel(ctx context.Context, out chan MaybeMsg, r string, creds Credentials, channel string) chan MaybeMsg {
b.creds = creds
err := b.getClientID(ctx)
if err != nil {
out <- MaybeMsg{Err: err}
close(out)
return out
}
err = b.subscribe(ctx, channel, r)
if err != nil {
out <- MaybeMsg{Err: err}
close(out)
return out
}
c := b.connect(ctx, out)
return c
}