in rocketmq-beats-integration/libbeat/outputs/rocketmq/client.go [101:153]
func (c *client) publishEvent(event *publisher.Event) bool {
serializedEvent, err := c.codec.Encode(c.index, &event.Content)
if err != nil {
c.observer.Dropped(1)
if !event.Guaranteed() {
return false
}
c.log.Errorf("Unable to encode event: %v", err)
return false
}
c.observer.WriteBytes(len(serializedEvent) + 1)
// str := string(serializedEvent)
// c.log.Warnf("Processing event: %v", str)
buf := make([]byte, len(serializedEvent))
copy(buf, serializedEvent)
msg := &primitive.Message{
Topic: c.topic,
Body: buf,
}
// res, err := c.producer.SendSync(context.Background(), msg)
// if err != nil {
// c.log.Errorf("send to rocketmq is error %v", err)
// return false
// } else {
// c.log.Warnf("send msg result=%v", res.String())
// }
err = c.producer.SendAsync(context.Background(),
func(ctx context.Context, result *primitive.SendResult, e error) {
if e != nil {
c.observer.Dropped(1)
c.log.Errorf("send to rocketmq is error %v", e)
} else {
c.observer.Acked(1)
c.log.Debugf("send msg result=%v", result.String())
}
}, msg)
if err != nil {
c.observer.Dropped(1)
c.log.Errorf("send to rocketmq is error %v", err)
return false
}
return true
}