internal/pipewatcher/pipewatcher.go (59 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package pipewatcher implements a generic pipe event watcher. This watcher // will create/trigger an event when a pipe is opened for reading. package pipewatcher import ( "os" "sync" ) // Handle is the event watcher implementation. type Handle struct { // watcherID is the event watcher id. watcherID string // options are the watcher options. options Options // waitingWrite is a flag to inform the Watcher that the Handler has or // hasn't finished writing. waitingWrite bool // mutex protects waitingWrite on concurrent accesses. mutex sync.Mutex } // Options are the watcher's extra options. type Options struct { // PipePath is the pipe path. PipePath string // Mode is the pipe mode. Mode uint32 // ReadEventID is the event id for the read event. ReadEventID string } // New allocates and initializes a new Watcher. func New(watcherID string, opts Options) *Handle { return &Handle{ watcherID: watcherID, options: opts, } } // ID returns the event watcher id. func (hdl *Handle) ID() string { return hdl.watcherID } // Events returns an slice with all implemented events. func (hdl *Handle) Events() []string { return []string{hdl.options.ReadEventID} } // PipeData wraps the pipe event data. type PipeData struct { // File is the writeonly pipe's file descriptor. The user/handler must // make sure to close it after processing the event. file *os.File // mu protects pipeData from concurrent accesses. mu sync.Mutex // Finished is a callback used by the event handler to inform the write to // the pipe is finished. finishedCb func() } // NewPipeData allocates and initializes a new PipeData. func NewPipeData(file *os.File, finishedCb func()) *PipeData { return &PipeData{ file: file, finishedCb: finishedCb, } } // Finished is a callback used by the event handler to inform the write to // the pipe is finished. func (pd *PipeData) Finished() { pd.finishedCb() } // WriteString writes the data to the pipe. func (pd *PipeData) WriteString(data string) (int, error) { pd.mu.Lock() defer pd.mu.Unlock() return pd.file.WriteString(data) } // Close closes the pipe. func (pd *PipeData) Close() error { pd.mu.Lock() defer pd.mu.Unlock() if pd.file == nil { return nil } if err := pd.file.Close(); err != nil { return err } pd.file = nil return nil }