func()

in internal/pipewatcher/pipewatcher_linux.go [91:158]


func (hdl *Handle) Run(ctx context.Context, evType string) (bool, any, error) {
	var canceled atomic.Bool

	for hdl.isWaitingWrite() {
		time.Sleep(10 * time.Millisecond)
	}

	// Channel used to cancel the context cancelation go routine.
	// Used when the Watcher is returning to the event manager.
	cancelContext := make(chan bool)
	defer close(cancelContext)

	// Cancelation handling code.
	go func() {
		select {
		case <-cancelContext:
			break
		case <-ctx.Done():
			canceled.Store(true)

			galog.V(2).Errorf("Context canceled, closing pipe: %s", hdl.options.PipePath)

			// Open the pipe as O_RDONLY to release the blocking open O_WRONLY.
			pipeFile, err := os.OpenFile(hdl.options.PipePath, os.O_RDONLY, 0644)
			if err != nil {
				galog.Errorf("Failed to open readonly pipe: %+v", err)
				return
			}

			defer func() {
				if err := pipeFile.Close(); err != nil {
					galog.Errorf("Failed to close readonly pipe: %+v", err)
				}

				if err := os.Remove(hdl.options.PipePath); err != nil {
					galog.Errorf("Failed to remove pipe: %+v", err)
				}
			}()
		}
	}()

	// If the configured named pipe doesn't exists we create it before emitting
	// events from it.
	if err := createNamedPipe(ctx, hdl.options.PipePath, hdl.options.Mode); err != nil {
		return true, nil, err
	}

	// Open the pipe as writeonly, it will block until a read is performed from
	// the other end of the pipe.
	pipeFile, err := os.OpenFile(hdl.options.PipePath, os.O_WRONLY, 0644)
	if err != nil {
		return true, nil, err
	}

	// Have we got a ctx.Done()? if so lets just return from here and unregister
	// the watcher.
	if canceled.Load() {
		if err := pipeFile.Close(); err != nil {
			galog.Errorf("Failed to close readonly pipe: %+v", err)
		}
		return false, nil, nil
	}

	cancelContext <- true
	hdl.setWaitingWrite(true)

	return true, NewPipeData(pipeFile, hdl.finishedCb), nil
}