in scripts/go/analyzeCUR/analyzeCUR.go [681:802]
func main() {
/// initialize AWS GO client
sess := session.Must(session.NewSessionWithOptions(session.Options{SharedConfigState: session.SharedConfigEnable}))
// Grab instance meta-data
meta := getInstanceMetadata(sess)
// re-init session now we have the region we are in
sess = sess.Copy(&aws.Config{Region: aws.String(meta["region"].(string))})
// Check if running on EC2
_, ec2 := meta["instanceId"].(string)
var logger *cwlogger.Logger
if ec2 { // Init Cloudwatch Logger class if were running on EC2
logger, err := cwlogger.New(&cwlogger.Config{
LogGroupName: "CURdashboard",
Client: cloudwatchlogs.New(sess),
})
if err != nil {
log.Fatal("Could not initalize Cloudwatch logger: " + err.Error())
}
defer logger.Close()
logger.Log(time.Now(), "CURDashboard running on "+meta["instanceId"].(string)+" in "+meta["availabilityZone"].(string))
}
// read in command line params
var configFile, account, sourceBucket, destBucket, curReportName, curReportPath, curDestPath, dateOverride string
if err := getParams(&configFile, &sourceBucket, &destBucket, &account, &curReportName, &curReportPath, &curDestPath, &dateOverride); err != nil {
doLog(logger, err.Error())
return
}
// read in config file
var conf Config
if err := getConfig(&conf, configFile); err != nil {
doLog(logger, err.Error())
}
// convert CUR
columns, s3Path, curDate, err := processCUR(sourceBucket, curReportName, curReportPath, curDestPath, destBucket, logger, dateOverride)
if err != nil {
doLog(logger, err.Error())
}
// initialize Athena class
svcAthena := athena.New(sess)
svcCW := cloudwatch.New(sess)
// make sure Athena DB exists - dont care about results
if _, err := sendQuery(svcAthena, "default", conf.Athena.DbSQL, meta["region"].(string), account); err != nil {
doLog(logger, "Could not create Athena Database: "+err.Error())
}
// make sure current Athena table exists
if err := createAthenaTable(svcAthena, conf.Athena.DbName, conf.Athena.TablePrefix, conf.Athena.TableSQL, columns, s3Path, curDate, meta["region"].(string), account); err != nil {
doLog(logger, "Could not create Athena Table: "+err.Error())
}
// // If RI analysis enabled - do it
// if conf.RI.Enabled {
// if err := riUtilization(sess, svcAthena, conf, key, secret, meta["region"].(string), account, date); err != nil {
// doLog(logger, err.Error())
// }
// }
// struct for a query job
type job struct {
svc *athena.Athena
db string
account string
region string
interval string
metric Metric
}
// channels for parallel execution
jobs := make(chan job)
done := make(chan bool)
// create upto maxConcurrentQueries workers to process metric jobs
for w := 0; w < maxConcurrentQueries; w++ {
go func() {
for {
j, ok := <-jobs
if !ok {
done <- true
return
}
sql := substituteParams(j.metric.SQL, map[string]string{"**DBNAME**": conf.Athena.DbName, "**DATE**": curDate, "**INTERVAL**": conf.MetricConfig.Substring[j.interval]})
results, err := sendQuery(j.svc, j.db, sql, j.region, j.account)
if err != nil {
doLog(logger, "Error querying Athena, SQL: "+sql+" , Error: "+err.Error())
continue
}
if err := sendMetric(svcCW, results, conf.General.Namespace, j.metric.CwName, j.metric.CwType, j.metric.CwDimension, j.interval); err != nil {
doLog(logger, "Error sending metric, name: "+j.metric.CwName+" , Error: "+err.Error())
}
}
}()
}
// pass every enabled metric into channel for processing
for metric := range conf.Metrics {
if conf.Metrics[metric].Enabled {
if conf.Metrics[metric].Hourly {
jobs <- job{svcAthena, conf.Athena.DbName, account, meta["region"].(string), "hourly", conf.Metrics[metric]}
}
if conf.Metrics[metric].Daily {
jobs <- job{svcAthena, conf.Athena.DbName, account, meta["region"].(string), "daily", conf.Metrics[metric]}
}
}
}
close(jobs)
// wait for jobs to complete
for w := 0; w < maxConcurrentQueries; w++ {
<-done
}
}