in plugins/inputs/socket_listener/socket_listener.go [289:410]
func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
sl.Accumulator = acc
spl := strings.SplitN(sl.ServiceAddress, "://", 2)
if len(spl) != 2 {
return fmt.Errorf("invalid service address: %s", sl.ServiceAddress)
}
protocol := spl[0]
addr := spl[1]
if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
// no good way of testing for "file does not exist".
// Instead just ignore error and blow up when we try to listen, which will
// indicate "address already in use" if file existed and we couldn't remove.
//nolint:errcheck,revive
os.Remove(addr)
}
switch protocol {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
tlsCfg, err := sl.ServerConfig.TLSConfig()
if err != nil {
return err
}
var l net.Listener
if tlsCfg == nil {
l, err = net.Listen(protocol, addr)
} else {
l, err = tls.Listen(protocol, addr, tlsCfg)
}
if err != nil {
return err
}
sl.Log.Infof("Listening on %s://%s", protocol, l.Addr())
// Set permissions on socket
if (spl[0] == "unix" || spl[0] == "unixpacket") && sl.SocketMode != "" {
// Convert from octal in string to int
i, err := strconv.ParseUint(sl.SocketMode, 8, 32)
if err != nil {
return err
}
if err := os.Chmod(spl[1], os.FileMode(uint32(i))); err != nil {
return err
}
}
ssl := &streamSocketListener{
Listener: l,
SocketListener: sl,
sockType: spl[0],
}
sl.Closer = ssl
sl.wg = sync.WaitGroup{}
sl.wg.Add(1)
go func() {
defer sl.wg.Done()
ssl.listen()
}()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
decoder, err := internal.NewContentDecoder(sl.ContentEncoding)
if err != nil {
return err
}
pc, err := udpListen(protocol, addr)
if err != nil {
return err
}
// Set permissions on socket
if spl[0] == "unixgram" && sl.SocketMode != "" {
// Convert from octal in string to int
i, err := strconv.ParseUint(sl.SocketMode, 8, 32)
if err != nil {
return err
}
if err := os.Chmod(spl[1], os.FileMode(uint32(i))); err != nil {
return err
}
}
if sl.ReadBufferSize > 0 {
if srb, ok := pc.(setReadBufferer); ok {
if err := srb.SetReadBuffer(int(sl.ReadBufferSize)); err != nil {
sl.Log.Warnf("Setting read buffer on a %s socket failed: %v", protocol, err)
}
} else {
sl.Log.Warnf("Unable to set read buffer on a %s socket", protocol)
}
}
sl.Log.Infof("Listening on %s://%s", protocol, pc.LocalAddr())
psl := &packetSocketListener{
PacketConn: pc,
SocketListener: sl,
decoder: decoder,
}
sl.Closer = psl
sl.wg = sync.WaitGroup{}
sl.wg.Add(1)
go func() {
defer sl.wg.Done()
psl.listen()
}()
default:
return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress)
}
if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
sl.Closer = unixCloser{path: spl[1], closer: sl.Closer}
}
return nil
}