cmd/legacy_main.go (320 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A fuse file system for Google Cloud Storage buckets.
//
// Usage:
//
// gcsfuse [flags] bucket mount_point
package cmd
import (
"fmt"
"io/fs"
"os"
"os/signal"
"path"
"path/filepath"
"strings"
"golang.org/x/sys/unix"
"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/common"
"github.com/googlecloudplatform/gcsfuse/v2/internal/canned"
"github.com/googlecloudplatform/gcsfuse/v2/internal/locker"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/monitor"
"github.com/googlecloudplatform/gcsfuse/v2/internal/mount"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"github.com/jacobsa/daemonize"
"github.com/jacobsa/fuse"
"github.com/kardianos/osext"
"golang.org/x/net/context"
)
const (
SuccessfulMountMessage = "File system has been successfully mounted."
UnsuccessfulMountMessagePrefix = "Error while mounting gcsfuse"
)
////////////////////////////////////////////////////////////////////////
// Helpers
////////////////////////////////////////////////////////////////////////
func registerTerminatingSignalHandler(mountPoint string, c *cfg.Config) {
// Register for SIGINT.
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, unix.SIGTERM)
// Start a goroutine that will unmount when the signal is received.
go func() {
for {
sig := <-signalChan
sigName := "undefined"
switch sig {
case unix.SIGTERM:
sigName = "SIGTERM"
case os.Interrupt:
sigName = "SIGINT"
}
logger.Infof("Received %s, attempting to unmount...", sigName)
err := fuse.Unmount(mountPoint)
if err != nil {
logger.Errorf("Failed to unmount in response to %s: %v", sigName, err)
} else {
logger.Infof("Successfully unmounted in response to %s.", sigName)
return
}
}
}()
}
func getUserAgent(appName string, config string) string {
gcsfuseMetadataImageType := os.Getenv("GCSFUSE_METADATA_IMAGE_TYPE")
if len(gcsfuseMetadataImageType) > 0 {
userAgent := fmt.Sprintf("gcsfuse/%s %s (GPN:gcsfuse-%s) (Cfg:%s)", common.GetVersion(), appName, gcsfuseMetadataImageType, config)
return strings.Join(strings.Fields(userAgent), " ")
} else if len(appName) > 0 {
return fmt.Sprintf("gcsfuse/%s (GPN:gcsfuse-%s) (Cfg:%s)", common.GetVersion(), appName, config)
} else {
return fmt.Sprintf("gcsfuse/%s (GPN:gcsfuse) (Cfg:%s)", common.GetVersion(), config)
}
}
func getConfigForUserAgent(mountConfig *cfg.Config) string {
// Minimum configuration details created in a bitset fashion. Right now, its restricted only to File Cache Settings.
isFileCacheEnabled := "0"
if cfg.IsFileCacheEnabled(mountConfig) {
isFileCacheEnabled = "1"
}
isFileCacheForRangeReadEnabled := "0"
if mountConfig.FileCache.CacheFileForRangeRead {
isFileCacheForRangeReadEnabled = "1"
}
isParallelDownloadsEnabled := "0"
if cfg.IsParallelDownloadsEnabled(mountConfig) {
isParallelDownloadsEnabled = "1"
}
areStreamingWritesEnabled := "0"
if mountConfig.Write.EnableStreamingWrites {
areStreamingWritesEnabled = "1"
}
return fmt.Sprintf("%s:%s:%s:%s", isFileCacheEnabled, isFileCacheForRangeReadEnabled, isParallelDownloadsEnabled, areStreamingWritesEnabled)
}
func createStorageHandle(newConfig *cfg.Config, userAgent string) (storageHandle storage.StorageHandle, err error) {
storageClientConfig := storageutil.StorageClientConfig{
ClientProtocol: newConfig.GcsConnection.ClientProtocol,
MaxConnsPerHost: int(newConfig.GcsConnection.MaxConnsPerHost),
MaxIdleConnsPerHost: int(newConfig.GcsConnection.MaxIdleConnsPerHost),
HttpClientTimeout: newConfig.GcsConnection.HttpClientTimeout,
MaxRetrySleep: newConfig.GcsRetries.MaxRetrySleep,
MaxRetryAttempts: int(newConfig.GcsRetries.MaxRetryAttempts),
RetryMultiplier: newConfig.GcsRetries.Multiplier,
UserAgent: userAgent,
CustomEndpoint: newConfig.GcsConnection.CustomEndpoint,
KeyFile: string(newConfig.GcsAuth.KeyFile),
AnonymousAccess: newConfig.GcsAuth.AnonymousAccess,
TokenUrl: newConfig.GcsAuth.TokenUrl,
ReuseTokenFromUrl: newConfig.GcsAuth.ReuseTokenFromUrl,
ExperimentalEnableJsonRead: newConfig.GcsConnection.ExperimentalEnableJsonRead,
GrpcConnPoolSize: int(newConfig.GcsConnection.GrpcConnPoolSize),
EnableHNS: newConfig.EnableHns,
ReadStallRetryConfig: newConfig.GcsRetries.ReadStall,
}
logger.Infof("UserAgent = %s\n", storageClientConfig.UserAgent)
storageHandle, err = storage.NewStorageHandle(context.Background(), storageClientConfig)
return
}
////////////////////////////////////////////////////////////////////////
// main logic
////////////////////////////////////////////////////////////////////////
// Mount the file system according to arguments in the supplied context.
func mountWithArgs(bucketName string, mountPoint string, newConfig *cfg.Config, metricHandle common.MetricHandle) (mfs *fuse.MountedFileSystem, err error) {
// Enable invariant checking if requested.
if newConfig.Debug.ExitOnInvariantViolation {
locker.EnableInvariantsCheck()
}
if newConfig.Debug.LogMutex {
locker.EnableDebugMessages()
}
// Grab the connection.
//
// Special case: if we're mounting the fake bucket, we don't need an actual
// connection.
var storageHandle storage.StorageHandle
if bucketName != canned.FakeBucketName {
userAgent := getUserAgent(newConfig.AppName, getConfigForUserAgent(newConfig))
logger.Info("Creating Storage handle...")
storageHandle, err = createStorageHandle(newConfig, userAgent)
if err != nil {
err = fmt.Errorf("failed to create storage handle using createStorageHandle: %w", err)
return
}
}
// Mount the file system.
logger.Infof("Creating a mount at %q\n", mountPoint)
mfs, err = mountWithStorageHandle(
context.Background(),
bucketName,
mountPoint,
newConfig,
storageHandle,
metricHandle)
if err != nil {
err = fmt.Errorf("mountWithStorageHandle: %w", err)
return
}
return
}
func populateArgs(args []string) (
bucketName string,
mountPoint string,
err error) {
// Extract arguments.
switch len(args) {
case 1:
bucketName = ""
mountPoint = args[0]
case 2:
bucketName = args[0]
mountPoint = args[1]
default:
err = fmt.Errorf(
"%s takes one or two arguments. Run `%s --help` for more info",
path.Base(os.Args[0]),
path.Base(os.Args[0]))
return
}
// Canonicalize the mount point, making it absolute. This is important when
// daemonizing below, since the daemon will change its working directory
// before running this code again.
mountPoint, err = util.GetResolvedPath(mountPoint)
if err != nil {
err = fmt.Errorf("canonicalizing mount point: %w", err)
return
}
return
}
func callListRecursive(mountPoint string) (err error) {
logger.Debugf("Started recursive metadata-prefetch of directory: \"%s\" ...", mountPoint)
numItems := 0
err = filepath.WalkDir(mountPoint, func(path string, d fs.DirEntry, err error) error {
if err == nil {
numItems++
return err
}
if d == nil {
return fmt.Errorf("got error walking: path=\"%s\" does not exist, error = %w", path, err)
}
return fmt.Errorf("got error walking: path=\"%s\", dentry=\"%s\", isDir=%v, error = %w", path, d.Name(), d.IsDir(), err)
})
if err != nil {
return fmt.Errorf("failed in recursive metadata-prefetch of directory: \"%s\"; error = %w", mountPoint, err)
}
logger.Debugf("... Completed recursive metadata-prefetch of directory: \"%s\". Number of items discovered: %v", mountPoint, numItems)
return nil
}
func isDynamicMount(bucketName string) bool {
return bucketName == "" || bucketName == "_"
}
func Mount(newConfig *cfg.Config, bucketName, mountPoint string) (err error) {
// Ideally this call to SetLogFormat (which internally creates a new defaultLogger)
// should be set as an else to the 'if flags.Foreground' check below, but currently
// that means the logs generated by resolveConfigFilePaths below don't honour
// the user-provided log-format.
logger.SetLogFormat(newConfig.Logging.Format)
if newConfig.Foreground {
err = logger.InitLogFile(newConfig.Logging)
if err != nil {
return fmt.Errorf("init log file: %w", err)
}
}
logger.Infof("Start gcsfuse/%s for app %q using mount point: %s\n", common.GetVersion(), newConfig.AppName, mountPoint)
// Log mount-config and the CLI flags in the log-file.
// If there is no log-file, then log these to stdout.
// Do not log these in stdout in case of daemonized run
// if these are already being logged into a log-file, otherwise
// there will be duplicate logs for these in both places (stdout and log-file).
if newConfig.Foreground || newConfig.Logging.FilePath == "" {
logger.Info("GCSFuse config", "config", newConfig)
}
// The following will not warn if the user explicitly passed the default value for StatCacheCapacity.
if newConfig.MetadataCache.DeprecatedStatCacheCapacity != mount.DefaultStatCacheCapacity {
logger.Warnf("Deprecated flag stat-cache-capacity used! Please switch to config parameter 'metadata-cache: stat-cache-max-size-mb'.")
}
// The following will not warn if the user explicitly passed the default value for StatCacheTTL or TypeCacheTTL.
if newConfig.MetadataCache.DeprecatedStatCacheTtl != mount.DefaultStatOrTypeCacheTTL || newConfig.MetadataCache.DeprecatedTypeCacheTtl != mount.DefaultStatOrTypeCacheTTL {
logger.Warnf("Deprecated flag stat-cache-ttl and/or type-cache-ttl used! Please switch to config parameter 'metadata-cache: ttl-secs' .")
}
// If we haven't been asked to run in foreground mode, we should run a daemon
// with the foreground flag set and wait for it to mount.
if !newConfig.Foreground {
// Find the executable.
var path string
path, err = osext.Executable()
if err != nil {
err = fmt.Errorf("osext.Executable: %w", err)
return
}
// Set up arguments. Be sure to use foreground mode, and to send along the
// potentially-modified mount point.
args := append([]string{"--foreground"}, os.Args[1:]...)
args[len(args)-1] = mountPoint
// Pass along PATH so that the daemon can find fusermount on Linux.
env := []string{
fmt.Sprintf("PATH=%s", os.Getenv("PATH")),
}
// Pass along GOOGLE_APPLICATION_CREDENTIALS, since we document in
// mounting.md that it can be used for specifying a key file.
if p, ok := os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS"); ok {
env = append(env, fmt.Sprintf("GOOGLE_APPLICATION_CREDENTIALS=%s", p))
}
// Pass through the https_proxy/http_proxy environment variable,
// in case the host requires a proxy server to reach the GCS endpoint.
// https_proxy has precedence over http_proxy, in case both are set
if p, ok := os.LookupEnv("https_proxy"); ok {
env = append(env, fmt.Sprintf("https_proxy=%s", p))
fmt.Fprintf(
os.Stdout,
"Added environment https_proxy: %s\n",
p)
} else if p, ok := os.LookupEnv("http_proxy"); ok {
env = append(env, fmt.Sprintf("http_proxy=%s", p))
fmt.Fprintf(
os.Stdout,
"Added environment http_proxy: %s\n",
p)
}
// Pass through the no_proxy environment variable. Whenever
// using the http(s)_proxy environment variables. This should
// also be included to know for which hosts the use of proxies
// should be ignored.
if p, ok := os.LookupEnv("no_proxy"); ok {
env = append(env, fmt.Sprintf("no_proxy=%s", p))
fmt.Fprintf(
os.Stdout,
"Added environment no_proxy: %s\n",
p)
}
// Pass the parent process working directory to child process via
// environment variable. This variable will be used to resolve relative paths.
if parentProcessExecutionDir, err := os.Getwd(); err == nil {
env = append(env, fmt.Sprintf("%s=%s", util.GCSFUSE_PARENT_PROCESS_DIR,
parentProcessExecutionDir))
}
// Here, parent process doesn't pass the $HOME to child process implicitly,
// hence we need to pass it explicitly.
if homeDir, _ := os.UserHomeDir(); err == nil {
env = append(env, fmt.Sprintf("HOME=%s", homeDir))
}
// This environment variable will be helpful to distinguish b/w the main
// process and daemon process. If this environment variable set that means
// programme is running as daemon process.
env = append(env, fmt.Sprintf("%s=true", logger.GCSFuseInBackgroundMode))
// logfile.stderr will capture the standard error (stderr) output of the gcsfuse background process.
var stderrFile *os.File
if newConfig.Logging.FilePath != "" {
stderrFileName := string(newConfig.Logging.FilePath) + ".stderr"
if stderrFile, err = os.OpenFile(stderrFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644); err != nil {
return err
}
}
// Run.
err = daemonize.Run(path, args, env, os.Stdout, stderrFile)
if err != nil {
return fmt.Errorf("daemonize.Run: %w", err)
}
logger.Infof(SuccessfulMountMessage)
return err
}
ctx := context.Background()
var metricExporterShutdownFn common.ShutdownFn
metricHandle := common.NewNoopMetrics()
if cfg.IsMetricsEnabled(&newConfig.Metrics) {
metricExporterShutdownFn = monitor.SetupOTelMetricExporters(ctx, newConfig)
if metricHandle, err = common.NewOTelMetrics(); err != nil {
metricHandle = common.NewNoopMetrics()
}
}
shutdownTracingFn := monitor.SetupTracing(ctx, newConfig)
shutdownFn := common.JoinShutdownFunc(metricExporterShutdownFn, shutdownTracingFn)
// Mount, writing information about our progress to the writer that package
// daemonize gives us and telling it about the outcome.
var mfs *fuse.MountedFileSystem
{
mfs, err = mountWithArgs(bucketName, mountPoint, newConfig, metricHandle)
// This utility is to absorb the error
// returned by daemonize.SignalOutcome calls by simply
// logging them as error logs.
callDaemonizeSignalOutcome := func(err error) {
if err2 := daemonize.SignalOutcome(err); err2 != nil {
logger.Errorf("Failed to signal error to parent-process from daemon: %v", err2)
}
}
markSuccessfulMount := func() {
// Print the success message in the log-file/stdout depending on what the logger is set to.
logger.Info(SuccessfulMountMessage)
callDaemonizeSignalOutcome(nil)
}
markMountFailure := func(err error) {
// Printing via mountStatus will have duplicate logs on the console while
// mounting gcsfuse in foreground mode. But this is important to avoid
// losing error logs when run in the background mode.
logger.Errorf("%s: %v\n", UnsuccessfulMountMessagePrefix, err)
err = fmt.Errorf("%s: mountWithArgs: %w", UnsuccessfulMountMessagePrefix, err)
callDaemonizeSignalOutcome(err)
}
if err != nil {
markMountFailure(err)
return err
}
if !isDynamicMount(bucketName) {
switch newConfig.MetadataCache.ExperimentalMetadataPrefetchOnMount {
case cfg.ExperimentalMetadataPrefetchOnMountSynchronous:
if err = callListRecursive(mountPoint); err != nil {
markMountFailure(err)
return err
}
case cfg.ExperimentalMetadataPrefetchOnMountAsynchronous:
go func() {
if err := callListRecursive(mountPoint); err != nil {
logger.Errorf("Metadata-prefetch failed: %v", err)
}
}()
}
}
markSuccessfulMount()
}
// Let the user unmount with Ctrl-C (SIGINT).
registerTerminatingSignalHandler(mfs.Dir(), newConfig)
// Wait for the file system to be unmounted.
if err = mfs.Join(ctx); err != nil {
err = fmt.Errorf("MountedFileSystem.Join: %w", err)
}
if shutdownFn != nil {
if shutdownErr := shutdownFn(ctx); shutdownErr != nil {
logger.Errorf("Error while shutting down trace exporter: %v", shutdownErr)
}
}
return err
}