in addons/addon-raas-s3-copy/packages/s3-synchronizer/src/s3-upload-file-watcher.go [21:236]
func setupUploadWatcher(wg *sync.WaitGroup, sess *session.Session, config *mountConfiguration, stopUploadWatchersAfter int, debug bool) error {
syncDir := config.destination
bucket := config.bucket
prefix := config.prefix
kmsKeyId := config.kmsKeyId
if debug {
log.Println("syncDir: " + syncDir + " bucket: " + bucket + " prefix: " + prefix)
}
// This shouldn't happen, but make the directory if it doesn't exist
if _, err := os.Stat(syncDir); os.IsNotExist(err) {
os.MkdirAll(syncDir, os.ModePerm)
}
// Create a channel for sub directories that get added
// These directories require explicit crawling to sync files up to S3 instead of just relying on file watcher
// This is required to capture the following edge case:
// 1. User directly creates a file and sub directories in one operation
// 2. This will result in OS firing CREATE events for the sub directories and the file
// 3. The code in processFileWatcherEvent will capture the CREATE event for the directories and attach
// new file watchers
// 4. The initial CREATE event of the file is never captured in this case because that event was fired
// BEFORE we could complete registration of the file watchers for the created sub directories
// To work around this timing issue, we need to enqueue these directories and crawl them after they are
// registered for file watching. This crawling is for catch up of any files that existed in that directory
// before the watching began.
dirRequiringCrawlCh := make(chan string, 1000)
// There are two primary loops (running in go routines - similar to threads)
// 1. THE MAIN LOOP: It takes care of starting new file watcher go routine everytime it receives a signal from "startNewWatcherLoopCh" channel below.
// The main loop stops when it times out i.e., when stopUploadWatchersAfter time elapses (if applicable)
//
// 2. THE FILE WATCHER LOOP: It receives events from the directory watcher and reacts to those events.
// The file watcher loop receives STOP signal from "stopWatcherLoopCh" channel below to stop.
// A stop signal is pushed to "stopWatcherLoopCh" either due to timeout or
// when the file watcher loop needs to be restarted with new directory watcher instance.
startNewWatcherLoopCh := make(chan bool, 1)
stopWatcherLoopCh := make(chan bool, 1000)
addDirsToFileWatcher := func(watcher *dirWatcher) {
// Watch the syncDir and all it's children directories
err := filepath.Walk(
syncDir,
watchDirFactory(watcher, dirRequiringCrawlCh, debug))
if err != nil {
log.Printf("Error setting up file watcher: %v\n", err)
}
}
processFileWatcherEvent := func(watcher *dirWatcher, event *fsnotify.Event) {
if debug {
log.Println("event:", event)
}
if event.Op&fsnotify.Rename == fsnotify.Rename || event.Op&fsnotify.Remove == fsnotify.Remove && !excludeFile(event.Name) {
if debug {
log.Println("renamed or deleted file:", event.Name)
}
if watcher.IsBeingWatched(event.Name) {
if debug {
log.Printf("\nDirectory being watched is renamed or deleted: %v\n\n", event.Name)
}
// Directory that was being watched is renamed or deleted
// When dir is renamed event.Name has the dir's old name
// Remove the directory from the file watcher
watcher.UnwatchDir(event.Name)
// If it's rename, it will also cause "Create" event for the dir with new name if the dir is moved
// to a directory that is also monitored so delete the older directory from S3
deleteDirFromS3(sess, syncDir, event.Name, bucket, prefix, debug)
} else {
// When file is renamed event.Name has the file's old name
// Rename will also cause "Create" event for the file with new name if the file is moved
// to a directory that is also monitored so delete old file from S3
deleteFromS3(sess, syncDir, event.Name, bucket, prefix, debug)
}
} else if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create && !excludeFile(event.Name) {
if debug {
log.Println("modified file:", event.Name)
}
// First check that this is a file
fi, err := os.Stat(event.Name)
if err != nil && os.IsNotExist(err) {
// We just received WRITE or CREATE event for the file but the file does not exist on the file system
// This can happen on Windows when a folder is renamed and new file is created or modified in the
// renamed folder
// Somehow, windows generates CREATE/WRITE events for the file under the old path
// For example, on Windows,
// When you manually create a directory, it’s created as "New directory" first and then when
// you rename it to say "d1" and add a file say "f1" to the directory, Windows generates file system
// CREATE event for file "New directory\f1" instead of "d1\f1"
if debug {
log.Println("Received CREATE or WRITE event for ", event.Name, " but the file or directory does not exist. This can happen when directory is renamed on Windows. Stopping existing file watcher loop and starting a new one.")
}
// In this case restart the watcher and let it re-watch all the way from the root of the mount i.e., syncDir
// Send stop signal to the loop running the current watcher
if debug {
log.Println("Sending STOP signal to existing file watcher loop")
}
stopWatcherLoopCh <- true
if debug {
log.Println("Sent STOP signal to existing file watcher loop")
}
// Send signal to start new watcher loop
if debug {
log.Println("Sending START signal to start new file watcher loop")
}
startNewWatcherLoopCh <- true
if debug {
log.Println("Sent START signal to start new file watcher loop")
}
return
} else if err != nil {
log.Println("Unable to stat file", err)
return
}
if fi.Mode().IsDir() {
if event.Op&fsnotify.Create == fsnotify.Create {
if debug {
log.Println(event.Name, "is a new directory, watching")
}
if err := filepath.Walk(
event.Name,
watchDirFactory(watcher, dirRequiringCrawlCh, debug),
); err != nil {
log.Println("Unable to watch directory", err)
}
return
}
if debug {
log.Println(event.Name, "is a directory, skipping")
}
return
}
uploadToS3(sess, syncDir, event.Name, bucket, prefix, kmsKeyId, debug)
}
}
uploadDir := func(watcher *dirWatcher, dirToUpload string, debug bool) {
if debug {
log.Println("Crawling directory", dirToUpload, "to upload file to s3 who may have been missed by file watcher")
}
if err := filepath.Walk(
dirToUpload,
func(path string, fi os.FileInfo, err error) error {
if fi != nil && fi.Mode().IsDir() {
if debug {
log.Println(path, "is a new directory, watching")
}
if err := filepath.Walk(
path,
watchDirFactory(watcher, dirRequiringCrawlCh, debug),
); err != nil {
log.Println("Unable to watch directory", err)
}
return nil
} else if fi != nil && !fi.Mode().IsDir() {
if debug {
log.Println("Uploading file", path, "to S3")
}
uploadToS3(sess, syncDir, path, bucket, prefix, kmsKeyId, debug)
return nil
}
return nil
},
); err != nil {
log.Println("Unable to upload directory", err)
}
}
go func() {
TheMainLoop:
for {
if stopUploadWatchersAfter > 0 {
select {
case <-time.After(time.Duration(stopUploadWatchersAfter) * time.Second):
if debug {
log.Printf("\n\n THE MAIN LOOP TIMEOUT \n\n")
}
break TheMainLoop
case <-startNewWatcherLoopCh:
if debug {
log.Printf("\n\n RECEIVED SIGNAL TO START NEW FILE WATCHER \n\n")
}
watcher := NewDirWatcher(debug)
go runFileWatcherLoop(wg, watcher, stopUploadWatchersAfter, &dirRequiringCrawlCh, uploadDir, debug, processFileWatcherEvent, &stopWatcherLoopCh)
addDirsToFileWatcher(watcher)
}
} else {
select {
case <-startNewWatcherLoopCh:
if debug {
log.Printf("\n\n RECEIVED SIGNAL TO START NEW FILE WATCHER \n\n")
}
watcher := NewDirWatcher(debug)
go runFileWatcherLoop(wg, watcher, stopUploadWatchersAfter, &dirRequiringCrawlCh, uploadDir, debug, processFileWatcherEvent, &stopWatcherLoopCh)
addDirsToFileWatcher(watcher)
}
}
}
}()
// Send signal to the channel to start new file watcher
startNewWatcherLoopCh <- true
return nil
}