in workload-management/s3-trigger-ecs-task/s3-file-processor/utils/Dynamodb.go [129:226]
func BatchWriteItem(records []*Record) (map[string]string, error) {
var request []*dynamodb.WriteRequest
response := map[string]string{}
for _, v := range records {
confirmationId := uuid.NewString()
response[v.OrderId] = confirmationId
request = append(request, &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{Item: map[string]*dynamodb.AttributeValue{
"Region": {
S: aws.String(v.Region),
},
"Country": {
S: aws.String(v.Country),
},
"ItemType": {
S: aws.String(v.ItemType),
},
"SalesChannel": {
S: aws.String(v.SalesChannel),
},
"OrderPriority": {
S: aws.String(v.OrderPriority),
},
"OrderDate": {
S: aws.String(v.OrderDate),
},
"OrderId": {
S: aws.String(v.OrderId),
},
"ConfirmationId": {
S: aws.String(confirmationId),
},
"ShipDate": {
S: aws.String(v.ShipDate),
},
"UnitSold": {
S: aws.String(strconv.Itoa(v.UnitSold)),
},
"UnitPrice": {
S: aws.String(FormatFloat(v.UnitPrice)),
},
"TotalRevenue": {
S: aws.String(FormatFloat(v.TotalRevenue)),
},
"TotalCost": {
S: aws.String(FormatFloat(v.TotalCost)),
},
"TotalProfit": {
S: aws.String(FormatFloat(v.TotalProfit)),
},
}},
})
}
/*
Implement retry in case of throttledException and if the response contains
any unprocessed results
*/
index := 0
for index < RetryCounter {
result, err := svc.BatchWriteItem(
&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
tableName: request,
}})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
index++
if result != nil && len(result.UnprocessedItems[tableName]) > 0 {
request = result.UnprocessedItems[tableName]
}
default:
log.Printf("Error while performing batch write %s", aerr.Error())
return nil, aerr
}
} else {
log.Printf("Unknown error encountered %s", err.Error())
return nil, err
}
} else {
if result != nil && len(result.UnprocessedItems[tableName]) > 0 {
request = result.UnprocessedItems[tableName]
index++
} else {
break
}
}
}
if index >= RetryCounter {
return nil, errors.New("max retries exceeded, unable to process the batch")
} else {
return response, nil
}
}