func mainImpl()

in addons/addon-raas-s3-copy/packages/s3-synchronizer/src/s3-synchronizer.go [31:141]


func mainImpl(sess *session.Session, debug bool, recurringDownloads bool, stopRecurringDownloadsAfter int, downloadInterval int, stopUploadWatchersAfter int, concurrency int, defaultS3Mounts string, destinationBase string, region string) error {
	// Use a map to emulate a set to keep track of existing mounts
	currentMounts := make(map[string]struct{}, 0)
	mountsCh := make(chan *mountConfiguration, 50)

	// Create wait group to keep track of go routines being spawned
	// so the main go routine can wait for them to completes
	var wg sync.WaitGroup

	// In another thread, get the next mount configuration from the buffered channel
	// and download the files. If the share is marked as writeable then start the
	// file watchers in another thread (because the setup function won't return)
	go func() {
		for {
			mountConfig := <-mountsCh
			if debug {
				log.Printf("Received mount configuration from channel: %+v\n", mountConfig)
			}
			var sessionToUse *session.Session = sess
			var studyId string = filepath.Base(mountConfig.destination)
			if !(strings.TrimSpace(mountConfig.roleArn) == "") {
				sessionToUse = makeSession(studyId, region)
			}
			bucket := mountConfig.bucket
			awsRegion, err := s3manager.GetBucketRegion(context.Background(), sessionToUse, bucket, *sess.Config.Region)
			if debug {
				log.Println("Bucket", bucket, "region is", awsRegion)
			}
			if err != nil {
				log.Println("Error getting region of the bucket", bucket, err)
			} else {
				sessionToUse = session.Must(session.NewSession(sessionToUse.Config))
				sessionToUse.Config.WithRegion(awsRegion)
			}
			if recurringDownloads {
				// Trigger recurring download
				setupRecurringDownloads(&wg, sessionToUse, mountConfig, concurrency, debug, downloadInterval, stopRecurringDownloadsAfter)
			} else {
				downloadFiles(sessionToUse, mountConfig, concurrency, debug)
			}
			if mountConfig.writeable {
				go func() {
					err := setupUploadWatcher(&wg, sessionToUse, mountConfig, stopUploadWatchersAfter, debug)
					if err != nil {
						log.Printf("Error setting up file watcher: " + err.Error())
					}
				}()
			}
			if debug {
				log.Printf("Decrement wg counter")
			}
			wg.Done() // Decrement wait group counter everytime we receive config from the mount channel and complete processing it
		}
	}()

	if debug {
		log.Println("Fetching environment info")
	}

	var s3MountsPtr *[]s3Mount
	var err error
	if defaultS3Mounts != "" {
		s3MountsPtr, err = getDefaultMounts(defaultS3Mounts)
	}

	if err != nil {
		log.Print("Error getting environment info: " + err.Error())
		return err
	}

	var s3Mounts []s3Mount
	if s3MountsPtr != nil {
		s3Mounts = *s3MountsPtr
	}

	if debug {
		log.Println("Parsing mounts...")
	}
	for _, mount := range s3Mounts {
		s := mountToString(&mount)
		_, exists := currentMounts[s]

		if debug {
			log.Printf("Mount: %v, Adding to mount queue: %t\n", *mount.Id, !exists)
		}

		if !exists {
			destination := filepath.Join(destinationBase, *mount.Id)
			config := newMountConfiguration(
				*mount.Bucket,
				*mount.Prefix,
				destination,
				*mount.Writeable,
				*mount.KmsArn,
				*mount.RoleArn,
			)
			wg.Add(1) // Increment wait group counter everytime we push config to the mount channel
			if debug {
				log.Printf("Increment wg counter")
			}
			mountsCh <- config
		}

		// Add to the currentMounts
		currentMounts[s] = struct{}{}
	}

	wg.Wait() // Wait until all spawned go routines complete before existing the program

	return nil
}