func getOptionsProducerFromProperties()

in plugin/connector/rocketmq/client/rocketmq_producer.go [108:155]


func getOptionsProducerFromProperties(properties map[string]string) ([]producer.Option, error) {
	clientConfig, err := getClientConfigFromProperties(properties)
	if clientConfig == nil {
		return nil, err
	}
	options := make([]producer.Option, 0)

	accessPoints := clientConfig.AccessPoints
	if len(accessPoints) == 0 {
		return nil, errors.New("fail to parse rocketmq producer config, invalid access points")
	}

	// name server address
	options = append(options, producer.WithNameServer(strings.Split(accessPoints, ",")))

	// instance name
	producerId := utils.GetInstanceName()
	options = append(options, producer.WithInstanceName(producerId))
	clientConfig.InstanceName = producerId

	// producer group name
	options = append(options, producer.WithGroupName(clientConfig.ProducerGroupName))

	// send msg timeout
	if len(clientConfig.SendMsgTimeout) != 0 {
		sendMsgTimeout, err := strconv.Atoi(clientConfig.SendMsgTimeout)
		if err == nil {
			options = append(options, producer.WithSendMsgTimeout(time.Duration(sendMsgTimeout)*time.Millisecond))
		}
	}

	// producer retry time
	if len(clientConfig.ProducerRetryTimes) != 0 {
		producerRetryTimes, err := strconv.Atoi(clientConfig.ProducerRetryTimes)
		if err == nil {
			options = append(options, producer.WithRetry(producerRetryTimes))
		}
	}

	// producer compress message body threshold
	if len(clientConfig.CompressMsgBodyThreshold) != 0 {
		compressMsgBodyThreshold, err := strconv.Atoi(clientConfig.CompressMsgBodyThreshold)
		if err == nil {
			options = append(options, producer.WithCompressMsgBodyOverHowmuch(compressMsgBodyThreshold))
		}
	}
	return options, nil
}