in plugin/connector/rocketmq/client/rocketmq_producer.go [108:155]
func getOptionsProducerFromProperties(properties map[string]string) ([]producer.Option, error) {
clientConfig, err := getClientConfigFromProperties(properties)
if clientConfig == nil {
return nil, err
}
options := make([]producer.Option, 0)
accessPoints := clientConfig.AccessPoints
if len(accessPoints) == 0 {
return nil, errors.New("fail to parse rocketmq producer config, invalid access points")
}
// name server address
options = append(options, producer.WithNameServer(strings.Split(accessPoints, ",")))
// instance name
producerId := utils.GetInstanceName()
options = append(options, producer.WithInstanceName(producerId))
clientConfig.InstanceName = producerId
// producer group name
options = append(options, producer.WithGroupName(clientConfig.ProducerGroupName))
// send msg timeout
if len(clientConfig.SendMsgTimeout) != 0 {
sendMsgTimeout, err := strconv.Atoi(clientConfig.SendMsgTimeout)
if err == nil {
options = append(options, producer.WithSendMsgTimeout(time.Duration(sendMsgTimeout)*time.Millisecond))
}
}
// producer retry time
if len(clientConfig.ProducerRetryTimes) != 0 {
producerRetryTimes, err := strconv.Atoi(clientConfig.ProducerRetryTimes)
if err == nil {
options = append(options, producer.WithRetry(producerRetryTimes))
}
}
// producer compress message body threshold
if len(clientConfig.CompressMsgBodyThreshold) != 0 {
compressMsgBodyThreshold, err := strconv.Atoi(clientConfig.CompressMsgBodyThreshold)
if err == nil {
options = append(options, producer.WithCompressMsgBodyOverHowmuch(compressMsgBodyThreshold))
}
}
return options, nil
}