in internal/pipewatcher/pipewatcher_linux.go [91:158]
func (hdl *Handle) Run(ctx context.Context, evType string) (bool, any, error) {
var canceled atomic.Bool
for hdl.isWaitingWrite() {
time.Sleep(10 * time.Millisecond)
}
// Channel used to cancel the context cancelation go routine.
// Used when the Watcher is returning to the event manager.
cancelContext := make(chan bool)
defer close(cancelContext)
// Cancelation handling code.
go func() {
select {
case <-cancelContext:
break
case <-ctx.Done():
canceled.Store(true)
galog.V(2).Errorf("Context canceled, closing pipe: %s", hdl.options.PipePath)
// Open the pipe as O_RDONLY to release the blocking open O_WRONLY.
pipeFile, err := os.OpenFile(hdl.options.PipePath, os.O_RDONLY, 0644)
if err != nil {
galog.Errorf("Failed to open readonly pipe: %+v", err)
return
}
defer func() {
if err := pipeFile.Close(); err != nil {
galog.Errorf("Failed to close readonly pipe: %+v", err)
}
if err := os.Remove(hdl.options.PipePath); err != nil {
galog.Errorf("Failed to remove pipe: %+v", err)
}
}()
}
}()
// If the configured named pipe doesn't exists we create it before emitting
// events from it.
if err := createNamedPipe(ctx, hdl.options.PipePath, hdl.options.Mode); err != nil {
return true, nil, err
}
// Open the pipe as writeonly, it will block until a read is performed from
// the other end of the pipe.
pipeFile, err := os.OpenFile(hdl.options.PipePath, os.O_WRONLY, 0644)
if err != nil {
return true, nil, err
}
// Have we got a ctx.Done()? if so lets just return from here and unregister
// the watcher.
if canceled.Load() {
if err := pipeFile.Close(); err != nil {
galog.Errorf("Failed to close readonly pipe: %+v", err)
}
return false, nil, nil
}
cancelContext <- true
hdl.setWaitingWrite(true)
return true, NewPipeData(pipeFile, hdl.finishedCb), nil
}