func()

in ext/datasource/file/refreshable_file.go [63:135]


func (s *RefreshableFileDataSource) Initialize() error {
	if !s.isInitialized.CompareAndSet(false, true) {
		return nil
	}

	err := s.doReadAndUpdate()
	if err != nil {
		logging.Error(err, "Fail to execute RefreshableFileDataSource.doReadAndUpdate")
	}

	w, err := fsnotify.NewWatcher()
	if err != nil {
		return errors.Errorf("Fail to new a watcher instance of fsnotify, err: %+v", err)
	}
	err = w.Add(s.sourceFilePath)
	if err != nil {
		return errors.Errorf("Fail add a watcher on file[%s], err: %+v", s.sourceFilePath, err)
	}
	s.watcher = w

	go util.RunWithRecover(func() {
		defer s.watcher.Close()
		for {
			select {
			case ev := <-s.watcher.Events:
				if ev.Op&fsnotify.Rename == fsnotify.Rename {
					logging.Warn("[RefreshableFileDataSource] The file source was renamed.", "sourceFilePath", s.sourceFilePath)
					updateErr := s.Handle(nil)
					if updateErr != nil {
						logging.Error(updateErr, "Fail to update nil property")
					}

					// try to watch sourceFile
					_ = s.watcher.Remove(s.sourceFilePath)
					retryCount := 0
					for {
						if retryCount > 5 {
							logging.Error(errors.New("retry failed"), "Fail to retry watch", "sourceFilePath", s.sourceFilePath)
							s.Close()
							return
						}
						e := s.watcher.Add(s.sourceFilePath)
						if e == nil {
							break
						}
						retryCount++
						logging.Error(e, "Failed to add to watcher", "sourceFilePath", s.sourceFilePath)
						util.Sleep(time.Second)
					}
				}
				if ev.Op&fsnotify.Remove == fsnotify.Remove {
					logging.Warn("[RefreshableFileDataSource] The file source was removed.", "sourceFilePath", s.sourceFilePath)
					updateErr := s.Handle(nil)
					if updateErr != nil {
						logging.Error(updateErr, "Fail to update nil property")
					}
					s.Close()
					return
				}

				err := s.doReadAndUpdate()
				if err != nil {
					logging.Error(err, "Fail to execute RefreshableFileDataSource.doReadAndUpdate")
				}
			case err := <-s.watcher.Errors:
				logging.Error(err, "Watch err on file", "sourceFilePath", s.sourceFilePath)
			case <-s.closeChan:
				return
			}
		}
	})
	return nil
}