func()

in telemetry/telemetrybuffer.go [92:192]


func (tb *TelemetryBuffer) StartServer() error {
	err := tb.Listen(FdName)
	if err != nil {
		tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
		if tb.logger != nil {
			tb.logger.Error("Listen returns", zap.Error(err))
		} else {
			log.Logf("Listen returns: %v", err.Error())
		}
		return err
	}

	if tb.logger != nil {
		tb.logger.Info("Telemetry service started")
	} else {
		log.Logf("Telemetry service started")
	}
	// Spawn server goroutine to handle incoming connections
	go func() {
		for {
			// Spawn worker goroutines to communicate with client
			conn, err := tb.listener.Accept()
			if err == nil {
				tb.mutex.Lock()
				tb.connections = append(tb.connections, conn)
				tb.mutex.Unlock()
				go func() {
					defer func() {
						var index int
						var value net.Conn
						var found bool

						tb.mutex.Lock()
						defer tb.mutex.Unlock()

						for index, value = range tb.connections {
							if value == conn {
								conn.Close()
								found = true
								break
							}
						}

						if found {
							tb.connections = remove(tb.connections, index)
						}
					}()
					reader := bufio.NewReader(conn)
					for {
						reportStr, readErr := reader.ReadBytes(Delimiter)
						if readErr != nil {
							return
						}
						reportStr = reportStr[:len(reportStr)-1]

						var tmp map[string]interface{}
						err = json.Unmarshal(reportStr, &tmp)
						if err != nil {
							if tb.logger != nil {
								tb.logger.Error("StartServer: unmarshal error", zap.Error(err))
							} else {
								log.Logf("StartServer: unmarshal error:%v", err)
							}
							return
						}
						if _, ok := tmp["CniSucceeded"]; ok {
							var cniReport CNIReport
							err = json.Unmarshal([]byte(reportStr), &cniReport)
							if err != nil {
								return
							}
							tb.data <- cniReport
						} else if _, ok := tmp["Metric"]; ok {
							var aiMetric AIMetric
							err = json.Unmarshal([]byte(reportStr), &aiMetric)
							if err != nil {
								return
							}
							tb.data <- aiMetric
						} else {
							if tb.logger != nil {
								tb.logger.Info("StartServer: default", zap.Any("case", tmp))
							} else {
								log.Logf("StartServer: default case:%+v...", tmp)
							}
						}
					}
				}()
			} else {
				if tb.logger != nil {
					tb.logger.Error("Telemetry Server accept error", zap.Error(err))
				} else {
					log.Logf("Telemetry Server accept error %v", err)
				}
				return
			}
		}
	}()

	return nil
}