in bayeux.go [194:241]
func (b *Bayeux) subscribe(ctx context.Context, channel string, replay string) error {
handshake := fmt.Sprintf(`{
"channel": "/meta/subscribe",
"subscription": "%s",
"clientId": "%s",
"ext": {
"replay": {"%s": "%s"}
}
}`, channel, b.id.clientID, channel, replay)
resp, err := b.call(ctx, handshake, b.creds.bayeuxUrl())
if err != nil {
return fmt.Errorf("cannot subscribe: %w", err)
}
defer resp.Body.Close()
if os.Getenv("DEBUG") != "" {
logger.Printf("Response: %+v", resp)
var b []byte
if resp.Body != nil {
b, _ = ioutil.ReadAll(resp.Body)
}
// Restore the io.ReadCloser to its original state
resp.Body = ioutil.NopCloser(bytes.NewBuffer(b))
// Use the content
s := string(b)
logger.Printf("Response Body: %s", s)
}
if resp.StatusCode > 299 {
return fmt.Errorf("received non 2XX response: %w", err)
}
decoder := json.NewDecoder(resp.Body)
var h []Subscription
if err := decoder.Decode(&h); err == io.EOF {
return err
} else if err != nil {
return err
}
sub := &h[0]
st.connected = sub.Successful
st.clientID = sub.ClientID
st.channels = append(st.channels, channel)
st.connect()
if os.Getenv("DEBUG") != "" {
logger.Printf("Established connection(s): %+v", st)
}
return nil
}