func()

in bayeux.go [194:241]


func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) error {
	handshake := fmt.Sprintf(`{
								"channel": "/meta/subscribe",
								"subscription": "%s",
								"clientId": "%s",
								"ext": {
									"replay": {"%s": "%s"}
									}
								}`, channel, b.id.clientID, channel, replay)
	resp, err := b.call(ctx, handshake, b.creds.bayeuxUrl())
	if err != nil {
		return fmt.Errorf("cannot subscribe: %w", err)
	}

	defer resp.Body.Close()
	if os.Getenv("DEBUG") != "" {
		logger.Printf("Response: %+v", resp)
		var b []byte
		if resp.Body != nil {
			b, _ = ioutil.ReadAll(resp.Body)
		}
		// Restore the io.ReadCloser to its original state
		resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
		// Use the content
		s := string(b)
		logger.Printf("Response Body: %s", s)
	}

	if resp.StatusCode > 299 {
		return fmt.Errorf("received non 2XX response: %w", err)
	}
	decoder := json.NewDecoder(resp.Body)
	var h []Subscription
	if err := decoder.Decode(&h); err == io.EOF {
		return err
	} else if err != nil {
		return err
	}
	sub := &h[0]
	st.connected = sub.Successful
	st.clientID = sub.ClientID
	st.channels = append(st.channels, channel)
	st.connect()
	if os.Getenv("DEBUG") != "" {
		logger.Printf("Established connection(s): %+v", st)
	}
	return nil
}