func()

in bayeux.go [243:295]


func (b *Bayeux) connect(ctx context.Context, out chan MaybeMsg) chan MaybeMsg {
	var waitMsgs sync.WaitGroup
	wg.Add(1)
	go func() {
		defer func() {
			waitMsgs.Wait()
			close(out)
			st.disconnect()
			wg.Done()
		}()
		for {
			select {
			case <-ctx.Done():
				return
			default:
				postBody := fmt.Sprintf(`{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "%s"} `, b.id.clientID)
				resp, err := b.call(ctx, postBody, b.creds.bayeuxUrl())
				if err != nil {
					if errors.Is(err, context.Canceled) {
						return
					}
					out <- MaybeMsg{Err: fmt.Errorf("cannot connect to bayeux: %s, trying again", err)}
				} else {
					if os.Getenv("DEBUG") != "" {
						var b []byte
						if resp.Body != nil {
							b, _ = ioutil.ReadAll(resp.Body)
						}
						// Restore the io.ReadCloser to its original state
						resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
						// Use the content
						s := string(b)
						logger.Printf("Response Body: %s", s)
					}
					var x []TriggerEvent
					decoder := json.NewDecoder(resp.Body)
					if err := decoder.Decode(&x); err != nil && err == io.EOF {
						out <- MaybeMsg{Err: err}
						return
					}
					for i := range x {
						waitMsgs.Add(1)
						go func(e TriggerEvent) {
							defer waitMsgs.Done()
							out <- MaybeMsg{Msg: e}
						}(x[i])
					}
				}
			}
		}
	}()
	return out
}