plugins/input/mqtt/input_mqtt.go (215 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mqtt import ( "crypto/tls" "fmt" "strconv" "sync" "time" MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/pkg/errors" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/util" ) // ServiceMQTT ... type ServiceMQTT struct { shutdown chan struct{} waitGroup sync.WaitGroup context pipeline.Context collector pipeline.Collector keys []string id int Server string Topics []string QoS int ClientID string Username string Password string SSLCA string // Path to CA file SSLCert string // Path to host cert file SSLKey string // Path to cert key file RetryMin int RetryRatio float64 RetryMax int CleanSession bool OrderMatters bool ClientIDAutoInc bool KeepAlive int Version int // 3 - MQTT 3.1 or 4 - MQTT 3.1.1 } type DebugLogger struct { } func (DebugLogger) Println(v ...interface{}) { fmt.Println(v...) } func (DebugLogger) Printf(format string, v ...interface{}) { fmt.Printf(format, v...) } func (p *ServiceMQTT) Init(context pipeline.Context) (int, error) { p.context = context p.id = int(time.Now().Unix() % 100000) if len(p.Topics) == 0 { p.Topics = append(p.Topics, "#") } if len(p.Server) == 0 { p.Server = "tcp://127.0.0.1:1883" } if len(p.ClientID) == 0 { p.ClientID = util.GetHostName() + "_" + strconv.Itoa(time.Now().Second()) } p.keys = append(p.keys, "server") p.keys = append(p.keys, "topic") p.keys = append(p.keys, "duplicated") p.keys = append(p.keys, "retained") p.keys = append(p.keys, "message_id") p.keys = append(p.keys, "content") return 0, nil } func (p *ServiceMQTT) Description() string { return "mqtt service input plugin for logtail" } // Collect takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" func (p *ServiceMQTT) Collect(pipeline.Collector) error { return nil } func (p *ServiceMQTT) onMessageReceived(client MQTT.Client, message MQTT.Message) { logger.Debug(p.context.GetRuntimeContext(), "Received message on topic", message.Topic()) values := make([]string, 6) values[0] = p.Server values[1] = message.Topic() if message.Duplicate() { values[2] = "true" } else { values[2] = "false" } if message.Retained() { values[3] = "true" } else { values[3] = "false" } values[4] = strconv.Itoa(int(message.MessageID())) values[5] = string(message.Payload()) p.collector.AddDataArray(nil, p.keys, values) } func (p *ServiceMQTT) createClient(tlsConfig *tls.Config, connLostChannel chan struct{}) (MQTT.Client, error) { connOpts := MQTT.NewClientOptions().AddBroker(p.Server).SetCleanSession(p.CleanSession) if tlsConfig != nil { connOpts.SetTLSConfig(tlsConfig) } if len(p.Username) != 0 { connOpts.SetUsername(p.Username) } if len(p.Password) != 0 { connOpts.SetPassword(p.Password) } if p.ClientIDAutoInc { connOpts.SetAutoReconnect(false) connOpts.SetClientID(p.ClientID + strconv.Itoa(p.id)) p.id++ } else { connOpts.SetAutoReconnect(true) connOpts.SetClientID(p.ClientID) } logger.Info(p.context.GetRuntimeContext(), "begin connect", p.Server, "client", connOpts.ClientID, "detail", *connOpts) // @note work around for topic filter's bug connOpts.SetDefaultPublishHandler(p.onMessageReceived) connOpts.SetProtocolVersion(uint(p.Version)) connOpts.OnConnect = func(c MQTT.Client) { } connOpts.OnConnectionLost = func(c MQTT.Client, err error) { logger.Error(p.context.GetRuntimeContext(), "MQTT_CONNECTION_LOST_ALARM", "connection lost", p.Server, "error", err) if connLostChannel != nil { connLostChannel <- struct{}{} } } client := MQTT.NewClient(connOpts) waitTime := time.Duration(p.RetryMin) * time.Second var err error connSuccess := false for { if token := client.Connect(); token.Wait() && token.Error() != nil { logger.Warning(p.context.GetRuntimeContext(), "MQTT_CONNECT_ALARM", "connect", p.Server, "error", token.Error()) if util.RandomSleep(waitTime, 0.1, p.shutdown) { err = errors.New("MQTT connect failed") break } waitTime = time.Duration(p.RetryRatio * float64(waitTime)) if waitTime > time.Second*time.Duration(p.RetryMax) { waitTime = time.Second * time.Duration(p.RetryMax) } } else { logger.Info(p.context.GetRuntimeContext(), "connected to", p.Server) connSuccess = true multiTopics := make(map[string]byte) for _, topic := range p.Topics { multiTopics[topic] = byte(p.QoS) } if token := client.SubscribeMultiple(multiTopics, nil); token.Wait() && token.Error() != nil { logger.Error(p.context.GetRuntimeContext(), "MQTT_SUBSCRIBE_ALARM", "subscribe topic", multiTopics, "error", token.Error()) } else { logger.Info(p.context.GetRuntimeContext(), "subscribe success, topic", multiTopics) } break } } if connSuccess { return client, nil } return nil, err } // Start starts the ServiceInput's service, whatever that may be func (p *ServiceMQTT) Start(c pipeline.Collector) error { p.shutdown = make(chan struct{}) p.collector = c p.waitGroup.Add(1) defer p.waitGroup.Done() var tlsConfig *tls.Config var err error if len(p.SSLCA) > 0 || len(p.SSLCert) > 0 || len(p.SSLKey) > 0 { tlsConfig, err = util.GetTLSConfig(p.SSLCert, p.SSLKey, p.SSLCA, true) if err != nil { return err } } if p.ClientIDAutoInc { waitTime := time.Duration(p.RetryMin) * time.Second for { closeChan := make(chan struct{}, 1) client, _ := p.createClient(tlsConfig, closeChan) if client != nil { waitTime = time.Duration(p.RetryMin) * time.Second select { case <-closeChan: close(closeChan) break case <-p.shutdown: client.Disconnect(1) return nil } } waitTime = time.Duration(p.RetryRatio * float64(waitTime)) if waitTime > time.Second*time.Duration(p.RetryMax) { waitTime = time.Second * time.Duration(p.RetryMax) } time.Sleep(waitTime) } } else { client, _ := p.createClient(tlsConfig, nil) if client != nil { <-p.shutdown client.Disconnect(1) } } return nil } // Stop stops the services and closes any necessary channels and connections func (p *ServiceMQTT) Stop() error { close(p.shutdown) p.waitGroup.Wait() return nil } func init() { pipeline.ServiceInputs["service_mqtt"] = func() pipeline.ServiceInput { return &ServiceMQTT{ RetryMin: 1, RetryRatio: 2.0, RetryMax: 300, KeepAlive: 30, OrderMatters: true, CleanSession: false, } } }