func()

in plugins/internal/plugin/device_plugin.go [134:197]


func (p *DevicePlugin) StartServer() error {

	// Create a new gRPC server instance
	// (Note that this is necessary to support restarts, since a server instance cannot be reused after it has stopped serving)
	p.server = grpc.NewServer()

	// Register our service implementation with the gRPC server
	p.logger.Info("Registering the service implementation with the gRPC server")
	pluginapi.RegisterDevicePluginServer(p.server, p)

	// Append a timestamp to the filename for the gRPC server's Unix socket to ensure it is unique
	p.endpoint = filepath.Join(pluginapi.DevicePluginPathWindows, fmt.Sprintf("%s-%d.sock", p.name, time.Now().UnixMilli()))

	// Attempt to listen for connections on our Unix socket
	p.logger.Infow("Listening on endpoint", "endpoint", p.endpoint)
	listener, err := net.Listen("unix", p.endpoint)
	if err != nil {
		return err
	}

	// Create the shutdown channel for stopping the ListAndWatch streaming RPC
	p.stopListWatch = make(chan struct{})

	// Create a file deletion watcher for our Unix socket
	endpointDeleted, err := WatchForDeletion(p.endpoint)
	if err != nil {
		return err
	}

	// We detect Kubelet restarts by detecting the deletion of our socket
	p.endpointDeleted = endpointDeleted
	go func() {
		for {
			select {

			case err, ok := <-p.endpointDeleted.Errors:
				if !ok {
					p.logger.Info("DeletionWatcher error channel closed")
					return
				}
				p.Errors <- err

			case _, ok := <-p.endpointDeleted.Deleted:
				if !ok {
					p.logger.Info("DeletionWatcher deletion channel closed")
					return
				}

				p.logger.Info("Endpoint deletion detected, triggering a restart of the gRPC server")
				p.restart <- struct{}{}
			}
		}
	}()

	// Start the gRPC server in a new goroutine and send any errors back through our error channel
	go func() {
		p.logger.Info("Starting the gRPC server")
		if err := p.server.Serve(listener); err != nil {
			p.Errors <- err
		}
	}()

	return nil
}