in pkg/broker/awsbroker.go [24:104]
func NewAWSBroker(o Options, awssess GetAwsSession, clients AwsClients, getCallerId GetCallerIder, updateCatalog UpdateCataloger, pollUpdate PollUpdater, mc *MetricsCollector) (*AwsBroker, error) {
sess := awssess(o.KeyID, o.SecretKey, o.Region, "", o.Profile, map[string]string{})
s3sess := awssess(o.KeyID, o.SecretKey, o.S3Region, "", o.Profile, map[string]string{})
s3svc := clients.NewS3(s3sess)
ddbsvc := clients.NewDdb(sess)
stssvc := clients.NewSts(sess)
callerid, err := getCallerId(stssvc)
if err != nil {
return &AwsBroker{}, err
}
accountid := *callerid.Account
accountuuid := uuid.NewV5(uuid.NullUUID{}.UUID, accountid+o.BrokerID)
glog.Infof("Running as caller identity '%+v'.", callerid)
var db Db
db.Brokerid = o.BrokerID
db.Accountid = accountid
db.Accountuuid = accountuuid
// connect DynamoDB adapter to storage port
db.DataStorePort = dynamodbadapter.DdbDataStore{
Accountid: accountid,
Accountuuid: accountuuid,
Brokerid: o.BrokerID,
Region: o.Region,
Ddb: *ddbsvc,
Tablename: o.TableName,
}
// setup in memory cache
var catalogcache = cache.NewMemoryWithTTL(time.Duration(CacheTTL))
var listingcache = cache.NewMemoryWithTTL(time.Duration(CacheTTL))
listingcache.StartGC(time.Minute * 5)
bd := &BucketDetailsRequest{
o.S3Bucket,
o.S3Key,
o.TemplateFilter,
}
// retrieve AWS partition from instance metadata service
partition, err := ec2metadata.New(sess).GetMetadata("/services/partition")
if err != nil {
partition = "aws" // no access to metadata service, defaults to AWS Standard Partition
}
// populate broker variables
bl := AwsBroker{
accountId: accountid,
keyid: o.KeyID,
secretkey: o.SecretKey,
profile: o.Profile,
tablename: o.TableName,
s3bucket: o.S3Bucket,
s3region: o.S3Region,
s3key: addTrailingSlash(o.S3Key),
templatefilter: o.TemplateFilter,
region: o.Region,
partition: partition,
s3svc: s3svc,
catalogcache: catalogcache,
listingcache: listingcache,
brokerid: o.BrokerID,
db: db,
GetSession: awssess,
Clients: clients,
prescribeOverrides: o.PrescribeOverrides,
globalOverrides: getGlobalOverrides(o.BrokerID),
metrics: mc,
}
// get catalog and setup periodic updates from S3
err = updateCatalog(listingcache, catalogcache, *bd, s3svc, db, bl, ListTemplates, ListingUpdate, MetadataUpdate)
if err != nil {
return &AwsBroker{}, err
}
go pollUpdate(600, listingcache, catalogcache, *bd, s3svc, db, bl, updateCatalog, ListTemplates)
return &bl, nil
}