internal/pipewatcher/pipewatcher_linux.go (100 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build linux
package pipewatcher
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"sync/atomic"
"syscall"
"time"
"github.com/GoogleCloudPlatform/galog"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/run"
)
// Create a named pipe if it doesn't exist.
func createNamedPipe(ctx context.Context, pipePath string, mode uint32) error {
pipeDir := filepath.Dir(pipePath)
_, err := os.Stat(pipeDir)
if err != nil && os.IsNotExist(err) {
// The perm 0755 is compatible with distros /etc/ directory.
if err := os.MkdirAll(pipeDir, 0755); err != nil {
return err
}
}
if _, err := os.Stat(pipePath); err != nil {
if os.IsNotExist(err) {
if err := syscall.Mkfifo(pipePath, mode); err != nil {
return fmt.Errorf("failed to create named pipe: %+v", err)
}
} else {
return fmt.Errorf("failed to stat file: " + pipePath)
}
}
restorecon, err := exec.LookPath("restorecon")
if err != nil {
galog.Infof("No restorecon available, not restoring SELinux context of: %s", pipePath)
return nil
}
opts := run.Options{Name: restorecon, Args: []string{pipePath}, OutputType: run.OutputNone}
if _, err := run.WithContext(ctx, opts); err != nil {
return fmt.Errorf("failed to restore SELinux context of: %s, %w", pipePath, err)
}
return nil
}
// finishedCb is used by the event handler to communicate the write to the
// pipe is finished, it's exposed via PipeData.Finished pointer.
func (hdl *Handle) finishedCb() {
hdl.setWaitingWrite(false)
}
// isWaitingWrite returns true if the watcher is waiting for a write to the
// pipe.
func (hdl *Handle) isWaitingWrite() bool {
hdl.mutex.Lock()
defer hdl.mutex.Unlock()
return hdl.waitingWrite
}
// setWaitingWrite sets the waitingWrite flag to the given value.
func (hdl *Handle) setWaitingWrite(val bool) {
hdl.mutex.Lock()
defer hdl.mutex.Unlock()
hdl.waitingWrite = val
}
// Run listens to the watcher's pipe open calls and report back the event.
func (hdl *Handle) Run(ctx context.Context, evType string) (bool, any, error) {
var canceled atomic.Bool
for hdl.isWaitingWrite() {
time.Sleep(10 * time.Millisecond)
}
// Channel used to cancel the context cancelation go routine.
// Used when the Watcher is returning to the event manager.
cancelContext := make(chan bool)
defer close(cancelContext)
// Cancelation handling code.
go func() {
select {
case <-cancelContext:
break
case <-ctx.Done():
canceled.Store(true)
galog.V(2).Errorf("Context canceled, closing pipe: %s", hdl.options.PipePath)
// Open the pipe as O_RDONLY to release the blocking open O_WRONLY.
pipeFile, err := os.OpenFile(hdl.options.PipePath, os.O_RDONLY, 0644)
if err != nil {
galog.Errorf("Failed to open readonly pipe: %+v", err)
return
}
defer func() {
if err := pipeFile.Close(); err != nil {
galog.Errorf("Failed to close readonly pipe: %+v", err)
}
if err := os.Remove(hdl.options.PipePath); err != nil {
galog.Errorf("Failed to remove pipe: %+v", err)
}
}()
}
}()
// If the configured named pipe doesn't exists we create it before emitting
// events from it.
if err := createNamedPipe(ctx, hdl.options.PipePath, hdl.options.Mode); err != nil {
return true, nil, err
}
// Open the pipe as writeonly, it will block until a read is performed from
// the other end of the pipe.
pipeFile, err := os.OpenFile(hdl.options.PipePath, os.O_WRONLY, 0644)
if err != nil {
return true, nil, err
}
// Have we got a ctx.Done()? if so lets just return from here and unregister
// the watcher.
if canceled.Load() {
if err := pipeFile.Close(); err != nil {
galog.Errorf("Failed to close readonly pipe: %+v", err)
}
return false, nil, nil
}
cancelContext <- true
hdl.setWaitingWrite(true)
return true, NewPipeData(pipeFile, hdl.finishedCb), nil
}