func main()

in scripts/go/analyzeCUR/analyzeCUR.go [681:802]


func main() {

	/// initialize AWS GO client
	sess := session.Must(session.NewSessionWithOptions(session.Options{SharedConfigState: session.SharedConfigEnable}))

	// Grab instance meta-data
	meta := getInstanceMetadata(sess)

	// re-init session now we have the region we are in
	sess = sess.Copy(&aws.Config{Region: aws.String(meta["region"].(string))})

	// Check if running on EC2
	_, ec2 := meta["instanceId"].(string)
	var logger *cwlogger.Logger
	if ec2 { // Init Cloudwatch Logger class if were running on EC2
		logger, err := cwlogger.New(&cwlogger.Config{
			LogGroupName: "CURdashboard",
			Client:       cloudwatchlogs.New(sess),
		})
		if err != nil {
			log.Fatal("Could not initalize Cloudwatch logger: " + err.Error())
		}
		defer logger.Close()
		logger.Log(time.Now(), "CURDashboard running on "+meta["instanceId"].(string)+" in "+meta["availabilityZone"].(string))
	}

	// read in command line params
	var configFile, account, sourceBucket, destBucket, curReportName, curReportPath, curDestPath, dateOverride string
	if err := getParams(&configFile, &sourceBucket, &destBucket, &account, &curReportName, &curReportPath, &curDestPath, &dateOverride); err != nil {
		doLog(logger, err.Error())
		return
	}

	// read in config file
	var conf Config
	if err := getConfig(&conf, configFile); err != nil {
		doLog(logger, err.Error())
	}

	// convert CUR
	columns, s3Path, curDate, err := processCUR(sourceBucket, curReportName, curReportPath, curDestPath, destBucket, logger, dateOverride)
	if err != nil {
		doLog(logger, err.Error())
	}

	// initialize Athena class
	svcAthena := athena.New(sess)
	svcCW := cloudwatch.New(sess)

	// make sure Athena DB exists - dont care about results
	if _, err := sendQuery(svcAthena, "default", conf.Athena.DbSQL, meta["region"].(string), account); err != nil {
		doLog(logger, "Could not create Athena Database: "+err.Error())
	}

	// make sure current Athena table exists
	if err := createAthenaTable(svcAthena, conf.Athena.DbName, conf.Athena.TablePrefix, conf.Athena.TableSQL, columns, s3Path, curDate, meta["region"].(string), account); err != nil {
		doLog(logger, "Could not create Athena Table: "+err.Error())
	}

	// // If RI analysis enabled - do it
	// if conf.RI.Enabled {
	// 	if err := riUtilization(sess, svcAthena, conf, key, secret, meta["region"].(string), account, date); err != nil {
	// 		doLog(logger, err.Error())
	// 	}
	// }

	// struct for a query job
	type job struct {
		svc      *athena.Athena
		db       string
		account  string
		region   string
		interval string
		metric   Metric
	}

	// channels for parallel execution
	jobs := make(chan job)
	done := make(chan bool)

	// create upto maxConcurrentQueries workers to process metric jobs
	for w := 0; w < maxConcurrentQueries; w++ {
		go func() {
			for {
				j, ok := <-jobs
				if !ok {
					done <- true
					return
				}

				sql := substituteParams(j.metric.SQL, map[string]string{"**DBNAME**": conf.Athena.DbName, "**DATE**": curDate, "**INTERVAL**": conf.MetricConfig.Substring[j.interval]})
				results, err := sendQuery(j.svc, j.db, sql, j.region, j.account)
				if err != nil {
					doLog(logger, "Error querying Athena, SQL: "+sql+" , Error: "+err.Error())
					continue
				}

				if err := sendMetric(svcCW, results, conf.General.Namespace, j.metric.CwName, j.metric.CwType, j.metric.CwDimension, j.interval); err != nil {
					doLog(logger, "Error sending metric, name: "+j.metric.CwName+" , Error: "+err.Error())
				}
			}
		}()
	}

	// pass every enabled metric into channel for processing
	for metric := range conf.Metrics {
		if conf.Metrics[metric].Enabled {
			if conf.Metrics[metric].Hourly {
				jobs <- job{svcAthena, conf.Athena.DbName, account, meta["region"].(string), "hourly", conf.Metrics[metric]}
			}
			if conf.Metrics[metric].Daily {
				jobs <- job{svcAthena, conf.Athena.DbName, account, meta["region"].(string), "daily", conf.Metrics[metric]}
			}
		}
	}
	close(jobs)

	// wait for jobs to complete
	for w := 0; w < maxConcurrentQueries; w++ {
		<-done
	}
}