in bayeux.go [243:295]
func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
var waitMsgs sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
waitMsgs.Wait()
close(out)
st.disconnect()
wg.Done()
}()
for {
select {
case <-ctx.Done():
return
default:
postBody := fmt.Sprintf(`{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "%s"} `, b.id.clientID)
resp, err := b.call(ctx, postBody, b.creds.bayeuxUrl())
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
out <- MaybeMsg{Err: fmt.Errorf("cannot connect to bayeux: %s, trying again", err)}
} else {
if os.Getenv("DEBUG") != "" {
var b []byte
if resp.Body != nil {
b, _ = ioutil.ReadAll(resp.Body)
}
// Restore the io.ReadCloser to its original state
resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
// Use the content
s := string(b)
logger.Printf("Response Body: %s", s)
}
var x []TriggerEvent
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&x); err != nil && err == io.EOF {
out <- MaybeMsg{Err: err}
return
}
for i := range x {
waitMsgs.Add(1)
go func(e TriggerEvent) {
defer waitMsgs.Done()
out <- MaybeMsg{Msg: e}
}(x[i])
}
}
}
}
}()
return out
}