func()

in plugins/inputs/socket_listener/socket_listener.go [289:410]


func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
	sl.Accumulator = acc
	spl := strings.SplitN(sl.ServiceAddress, "://", 2)
	if len(spl) != 2 {
		return fmt.Errorf("invalid service address: %s", sl.ServiceAddress)
	}

	protocol := spl[0]
	addr := spl[1]

	if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
		// no good way of testing for "file does not exist".
		// Instead just ignore error and blow up when we try to listen, which will
		// indicate "address already in use" if file existed and we couldn't remove.
		//nolint:errcheck,revive
		os.Remove(addr)
	}

	switch protocol {
	case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
		tlsCfg, err := sl.ServerConfig.TLSConfig()
		if err != nil {
			return err
		}

		var l net.Listener
		if tlsCfg == nil {
			l, err = net.Listen(protocol, addr)
		} else {
			l, err = tls.Listen(protocol, addr, tlsCfg)
		}
		if err != nil {
			return err
		}

		sl.Log.Infof("Listening on %s://%s", protocol, l.Addr())

		// Set permissions on socket
		if (spl[0] == "unix" || spl[0] == "unixpacket") && sl.SocketMode != "" {
			// Convert from octal in string to int
			i, err := strconv.ParseUint(sl.SocketMode, 8, 32)
			if err != nil {
				return err
			}

			if err := os.Chmod(spl[1], os.FileMode(uint32(i))); err != nil {
				return err
			}
		}

		ssl := &streamSocketListener{
			Listener:       l,
			SocketListener: sl,
			sockType:       spl[0],
		}

		sl.Closer = ssl
		sl.wg = sync.WaitGroup{}
		sl.wg.Add(1)
		go func() {
			defer sl.wg.Done()
			ssl.listen()
		}()
	case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
		decoder, err := internal.NewContentDecoder(sl.ContentEncoding)
		if err != nil {
			return err
		}

		pc, err := udpListen(protocol, addr)
		if err != nil {
			return err
		}

		// Set permissions on socket
		if spl[0] == "unixgram" && sl.SocketMode != "" {
			// Convert from octal in string to int
			i, err := strconv.ParseUint(sl.SocketMode, 8, 32)
			if err != nil {
				return err
			}

			if err := os.Chmod(spl[1], os.FileMode(uint32(i))); err != nil {
				return err
			}
		}

		if sl.ReadBufferSize > 0 {
			if srb, ok := pc.(setReadBufferer); ok {
				if err := srb.SetReadBuffer(int(sl.ReadBufferSize)); err != nil {
					sl.Log.Warnf("Setting read buffer on a %s socket failed: %v", protocol, err)
				}
			} else {
				sl.Log.Warnf("Unable to set read buffer on a %s socket", protocol)
			}
		}

		sl.Log.Infof("Listening on %s://%s", protocol, pc.LocalAddr())

		psl := &packetSocketListener{
			PacketConn:     pc,
			SocketListener: sl,
			decoder:        decoder,
		}

		sl.Closer = psl
		sl.wg = sync.WaitGroup{}
		sl.wg.Add(1)
		go func() {
			defer sl.wg.Done()
			psl.listen()
		}()
	default:
		return fmt.Errorf("unknown protocol '%s' in '%s'", protocol, sl.ServiceAddress)
	}

	if protocol == "unix" || protocol == "unixpacket" || protocol == "unixgram" {
		sl.Closer = unixCloser{path: spl[1], closer: sl.Closer}
	}

	return nil
}