func()

in rocketmq-beats-integration/libbeat/outputs/rocketmq/client.go [101:153]


func (c *client) publishEvent(event *publisher.Event) bool {
	serializedEvent, err := c.codec.Encode(c.index, &event.Content)
	if err != nil {
		c.observer.Dropped(1)

		if !event.Guaranteed() {
			return false
		}
		c.log.Errorf("Unable to encode event: %v", err)
		return false
	}

	c.observer.WriteBytes(len(serializedEvent) + 1)

	// str := string(serializedEvent)
	// c.log.Warnf("Processing event: %v", str)

	buf := make([]byte, len(serializedEvent))
	copy(buf, serializedEvent)

	msg := &primitive.Message{
		Topic: c.topic,
		Body:  buf,
	}

	// res, err := c.producer.SendSync(context.Background(), msg)

	// if err != nil {
	// 	c.log.Errorf("send to rocketmq  is error %v", err)
	// 	return false
	// } else {
	// 	c.log.Warnf("send msg result=%v", res.String())
	// }

	err = c.producer.SendAsync(context.Background(),
		func(ctx context.Context, result *primitive.SendResult, e error) {
			if e != nil {
				c.observer.Dropped(1)
				c.log.Errorf("send to rocketmq is error %v", e)
			} else {
				c.observer.Acked(1)
				c.log.Debugf("send msg result=%v", result.String())
			}
		}, msg)

	if err != nil {
		c.observer.Dropped(1)
		c.log.Errorf("send to rocketmq is error %v", err)
		return false
	}

	return true
}