func BatchWriteItem()

in workload-management/s3-trigger-ecs-task/s3-file-processor/utils/Dynamodb.go [129:226]


func BatchWriteItem(records []*Record) (map[string]string, error) {
	var request []*dynamodb.WriteRequest
	response := map[string]string{}
	for _, v := range records {
		confirmationId := uuid.NewString()
		response[v.OrderId] = confirmationId
		request = append(request, &dynamodb.WriteRequest{
			PutRequest: &dynamodb.PutRequest{Item: map[string]*dynamodb.AttributeValue{
				"Region": {
					S: aws.String(v.Region),
				},
				"Country": {
					S: aws.String(v.Country),
				},
				"ItemType": {
					S: aws.String(v.ItemType),
				},
				"SalesChannel": {
					S: aws.String(v.SalesChannel),
				},
				"OrderPriority": {
					S: aws.String(v.OrderPriority),
				},
				"OrderDate": {
					S: aws.String(v.OrderDate),
				},
				"OrderId": {
					S: aws.String(v.OrderId),
				},
				"ConfirmationId": {
					S: aws.String(confirmationId),
				},
				"ShipDate": {
					S: aws.String(v.ShipDate),
				},
				"UnitSold": {
					S: aws.String(strconv.Itoa(v.UnitSold)),
				},
				"UnitPrice": {
					S: aws.String(FormatFloat(v.UnitPrice)),
				},
				"TotalRevenue": {
					S: aws.String(FormatFloat(v.TotalRevenue)),
				},
				"TotalCost": {
					S: aws.String(FormatFloat(v.TotalCost)),
				},
				"TotalProfit": {
					S: aws.String(FormatFloat(v.TotalProfit)),
				},
			}},
		})
	}

	/*
		Implement retry in case of throttledException and if the response contains
		any unprocessed results
	*/
	index := 0
	for index < RetryCounter {
		result, err := svc.BatchWriteItem(
			&dynamodb.BatchWriteItemInput{
				RequestItems: map[string][]*dynamodb.WriteRequest{
					tableName: request,
				}})

		if err != nil {
			if aerr, ok := err.(awserr.Error); ok {
				switch aerr.Code() {
				case dynamodb.ErrCodeProvisionedThroughputExceededException:
					index++
					if result != nil && len(result.UnprocessedItems[tableName]) > 0 {
						request = result.UnprocessedItems[tableName]
					}
				default:
					log.Printf("Error while performing batch write %s", aerr.Error())
					return nil, aerr
				}
			} else {
				log.Printf("Unknown error encountered %s", err.Error())
				return nil, err
			}
		} else {
			if result != nil && len(result.UnprocessedItems[tableName]) > 0 {
				request = result.UnprocessedItems[tableName]
				index++
			} else {
				break
			}
		}
	}

	if index >= RetryCounter {
		return nil, errors.New("max retries exceeded, unable to process the batch")
	} else {
		return response, nil
	}
}