func setupUploadWatcher()

in addons/addon-raas-s3-copy/packages/s3-synchronizer/src/s3-upload-file-watcher.go [21:236]


func setupUploadWatcher(wg *sync.WaitGroup, sess *session.Session, config *mountConfiguration, stopUploadWatchersAfter int, debug bool) error {
	syncDir := config.destination
	bucket := config.bucket
	prefix := config.prefix
	kmsKeyId := config.kmsKeyId

	if debug {
		log.Println("syncDir: " + syncDir + " bucket: " + bucket + " prefix: " + prefix)
	}

	// This shouldn't happen, but make the directory if it doesn't exist
	if _, err := os.Stat(syncDir); os.IsNotExist(err) {
		os.MkdirAll(syncDir, os.ModePerm)
	}

	// Create a channel for sub directories that get added
	// These directories require explicit crawling to sync files up to S3 instead of just relying on file watcher
	// This is required to capture the following edge case:
	// 1. User directly creates a file and sub directories in one operation
	// 2. This will result in OS firing CREATE events for the sub directories and the file
	// 3. The code in processFileWatcherEvent will capture the CREATE event for the directories and attach
	//	   new file watchers
	// 4. The initial CREATE event of the file is never captured in this case because that event was fired
	//	  BEFORE we could complete registration of the file watchers for the created sub directories
	// To work around this timing issue, we need to enqueue these directories and crawl them after they are
	// registered for file watching. This crawling is for catch up of any files that existed in that directory
	//	before the watching began.
	dirRequiringCrawlCh := make(chan string, 1000)


	// There are two primary loops (running in go routines - similar to threads)
	// 1. THE MAIN LOOP: It takes care of starting new file watcher go routine everytime it receives a signal from "startNewWatcherLoopCh" channel below.
	//					 The main loop stops when it times out i.e., when stopUploadWatchersAfter time elapses (if applicable)
	//
	// 2. THE FILE WATCHER LOOP: It receives events from the directory watcher and reacts to those events.
	//							 The file watcher loop receives STOP signal from "stopWatcherLoopCh" channel below to stop.
	//							 A stop signal is pushed to "stopWatcherLoopCh" either due to timeout or
	//							 when the file watcher loop needs to be restarted with new directory watcher instance.
	startNewWatcherLoopCh := make(chan bool, 1)
	stopWatcherLoopCh := make(chan bool, 1000)

	addDirsToFileWatcher := func(watcher *dirWatcher) {
		// Watch the syncDir and all it's children directories
		err := filepath.Walk(
			syncDir,
			watchDirFactory(watcher, dirRequiringCrawlCh, debug))

		if err != nil {
			log.Printf("Error setting up file watcher: %v\n", err)
		}
	}

	processFileWatcherEvent := func(watcher *dirWatcher, event *fsnotify.Event) {
		if debug {
			log.Println("event:", event)
		}
		if event.Op&fsnotify.Rename == fsnotify.Rename || event.Op&fsnotify.Remove == fsnotify.Remove && !excludeFile(event.Name) {
			if debug {
				log.Println("renamed or deleted file:", event.Name)
			}

			if watcher.IsBeingWatched(event.Name) {
				if debug {
					log.Printf("\nDirectory being watched is renamed or deleted: %v\n\n", event.Name)
				}
				// Directory that was being watched is renamed or deleted
				// When dir is renamed event.Name has the dir's old name
				// Remove the directory from the file watcher
				watcher.UnwatchDir(event.Name)
				// If it's rename, it will also cause "Create" event for the dir with new name if the dir is moved
				// to a directory that is also monitored so delete the older directory from S3
				deleteDirFromS3(sess, syncDir, event.Name, bucket, prefix, debug)
			} else {
				// When file is renamed event.Name has the file's old name
				// Rename will also cause "Create" event for the file with new name if the file is moved
				// to a directory that is also monitored so delete old file from S3
				deleteFromS3(sess, syncDir, event.Name, bucket, prefix, debug)
			}

		} else if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create && !excludeFile(event.Name) {
			if debug {
				log.Println("modified file:", event.Name)
			}
			// First check that this is a file
			fi, err := os.Stat(event.Name)
			if err != nil && os.IsNotExist(err) {
				// We just received WRITE or CREATE event for the file but the file does not exist on the file system
				// This can happen on Windows when a folder is renamed and new file is created or modified in the
				// renamed folder
				// Somehow, windows generates CREATE/WRITE events for the file under the old path
				// For example, on Windows,
				// When you manually create a directory, it’s created as "New directory" first and then when
				// you rename it to say "d1" and add a file say "f1" to the directory, Windows generates file system
				// CREATE event for file "New directory\f1" instead of "d1\f1"

				if debug {
					log.Println("Received CREATE or WRITE event for ", event.Name, " but the file or directory does not exist. This can happen when directory is renamed on Windows. Stopping existing file watcher loop and starting a new one.")
				}

				// In this case restart the watcher and let it re-watch all the way from the root of the mount i.e., syncDir
				// Send stop signal to the loop running the current watcher
				if debug {
					log.Println("Sending STOP signal to existing file watcher loop")
				}
				stopWatcherLoopCh <- true
				if debug {
					log.Println("Sent STOP signal to existing file watcher loop")
				}

				// Send signal to start new watcher loop
				if debug {
					log.Println("Sending START signal to start new file watcher loop")
				}
				startNewWatcherLoopCh <- true
				if debug {
					log.Println("Sent START signal to start new file watcher loop")
				}

				return
			} else if err != nil {
				log.Println("Unable to stat file", err)
				return
			}

			if fi.Mode().IsDir() {
				if event.Op&fsnotify.Create == fsnotify.Create {
					if debug {
						log.Println(event.Name, "is a new directory, watching")
					}
					if err := filepath.Walk(
						event.Name,
						watchDirFactory(watcher, dirRequiringCrawlCh, debug),
					); err != nil {
						log.Println("Unable to watch directory", err)
					}
					return
				}
				if debug {
					log.Println(event.Name, "is a directory, skipping")
				}
				return
			}

			uploadToS3(sess, syncDir, event.Name, bucket, prefix, kmsKeyId, debug)
		}
	}

	uploadDir := func(watcher *dirWatcher, dirToUpload string, debug bool) {
		if debug {
			log.Println("Crawling directory", dirToUpload, "to upload file to s3 who may have been missed by file watcher")
		}
		if err := filepath.Walk(
			dirToUpload,
			func(path string, fi os.FileInfo, err error) error {
				if fi != nil && fi.Mode().IsDir() {
					if debug {
						log.Println(path, "is a new directory, watching")
					}
					if err := filepath.Walk(
						path,
						watchDirFactory(watcher, dirRequiringCrawlCh, debug),
					); err != nil {
						log.Println("Unable to watch directory", err)
					}
					return nil
				} else if fi != nil && !fi.Mode().IsDir() {
					if debug {
						log.Println("Uploading file", path, "to S3")
					}
					uploadToS3(sess, syncDir, path, bucket, prefix, kmsKeyId, debug)
					return nil
				}
				return nil
			},
		); err != nil {
			log.Println("Unable to upload directory", err)
		}
	}

	go func() {
	TheMainLoop:
		for {
			if stopUploadWatchersAfter > 0 {
				select {
				case <-time.After(time.Duration(stopUploadWatchersAfter) * time.Second):
					if debug {
						log.Printf("\n\n THE MAIN LOOP TIMEOUT \n\n")
					}
					break TheMainLoop
				case <-startNewWatcherLoopCh:
					if debug {
						log.Printf("\n\n RECEIVED SIGNAL TO START NEW FILE WATCHER \n\n")
					}
					watcher := NewDirWatcher(debug)
					go runFileWatcherLoop(wg, watcher, stopUploadWatchersAfter, &dirRequiringCrawlCh, uploadDir, debug, processFileWatcherEvent, &stopWatcherLoopCh)
					addDirsToFileWatcher(watcher)
				}
			} else {
				select {
				case <-startNewWatcherLoopCh:
					if debug {
						log.Printf("\n\n RECEIVED SIGNAL TO START NEW FILE WATCHER \n\n")
					}
					watcher := NewDirWatcher(debug)
					go runFileWatcherLoop(wg, watcher, stopUploadWatchersAfter, &dirRequiringCrawlCh, uploadDir, debug, processFileWatcherEvent, &stopWatcherLoopCh)
					addDirsToFileWatcher(watcher)
				}
			}
		}
	}()

	// Send signal to the channel to start new file watcher
	startNewWatcherLoopCh <- true

	return nil
}