in addons/addon-raas-s3-copy/packages/s3-synchronizer/src/s3-synchronizer.go [31:141]
func mainImpl(sess *session.Session, debug bool, recurringDownloads bool, stopRecurringDownloadsAfter int, downloadInterval int, stopUploadWatchersAfter int, concurrency int, defaultS3Mounts string, destinationBase string, region string) error {
// Use a map to emulate a set to keep track of existing mounts
currentMounts := make(map[string]struct{}, 0)
mountsCh := make(chan *mountConfiguration, 50)
// Create wait group to keep track of go routines being spawned
// so the main go routine can wait for them to completes
var wg sync.WaitGroup
// In another thread, get the next mount configuration from the buffered channel
// and download the files. If the share is marked as writeable then start the
// file watchers in another thread (because the setup function won't return)
go func() {
for {
mountConfig := <-mountsCh
if debug {
log.Printf("Received mount configuration from channel: %+v\n", mountConfig)
}
var sessionToUse *session.Session = sess
var studyId string = filepath.Base(mountConfig.destination)
if !(strings.TrimSpace(mountConfig.roleArn) == "") {
sessionToUse = makeSession(studyId, region)
}
bucket := mountConfig.bucket
awsRegion, err := s3manager.GetBucketRegion(context.Background(), sessionToUse, bucket, *sess.Config.Region)
if debug {
log.Println("Bucket", bucket, "region is", awsRegion)
}
if err != nil {
log.Println("Error getting region of the bucket", bucket, err)
} else {
sessionToUse = session.Must(session.NewSession(sessionToUse.Config))
sessionToUse.Config.WithRegion(awsRegion)
}
if recurringDownloads {
// Trigger recurring download
setupRecurringDownloads(&wg, sessionToUse, mountConfig, concurrency, debug, downloadInterval, stopRecurringDownloadsAfter)
} else {
downloadFiles(sessionToUse, mountConfig, concurrency, debug)
}
if mountConfig.writeable {
go func() {
err := setupUploadWatcher(&wg, sessionToUse, mountConfig, stopUploadWatchersAfter, debug)
if err != nil {
log.Printf("Error setting up file watcher: " + err.Error())
}
}()
}
if debug {
log.Printf("Decrement wg counter")
}
wg.Done() // Decrement wait group counter everytime we receive config from the mount channel and complete processing it
}
}()
if debug {
log.Println("Fetching environment info")
}
var s3MountsPtr *[]s3Mount
var err error
if defaultS3Mounts != "" {
s3MountsPtr, err = getDefaultMounts(defaultS3Mounts)
}
if err != nil {
log.Print("Error getting environment info: " + err.Error())
return err
}
var s3Mounts []s3Mount
if s3MountsPtr != nil {
s3Mounts = *s3MountsPtr
}
if debug {
log.Println("Parsing mounts...")
}
for _, mount := range s3Mounts {
s := mountToString(&mount)
_, exists := currentMounts[s]
if debug {
log.Printf("Mount: %v, Adding to mount queue: %t\n", *mount.Id, !exists)
}
if !exists {
destination := filepath.Join(destinationBase, *mount.Id)
config := newMountConfiguration(
*mount.Bucket,
*mount.Prefix,
destination,
*mount.Writeable,
*mount.KmsArn,
*mount.RoleArn,
)
wg.Add(1) // Increment wait group counter everytime we push config to the mount channel
if debug {
log.Printf("Increment wg counter")
}
mountsCh <- config
}
// Add to the currentMounts
currentMounts[s] = struct{}{}
}
wg.Wait() // Wait until all spawned go routines complete before existing the program
return nil
}