in dth/service.go [135:174]
func (ss *SqsService) SendMessageInBatch(ctx context.Context, batch []*string) {
// log.Printf("Sending %d messages to Queue in batch ", len(batch))
// Assume batch size <= 10
entries := make([]types.SendMessageBatchRequestEntry, len(batch), len(batch))
for id, body := range batch {
idstr := strconv.Itoa(id)
// fmt.Printf("Id is %s\n", idstr)
entry := types.SendMessageBatchRequestEntry{
Id: &idstr,
MessageBody: body,
}
entries[id] = entry
}
input := &sqs.SendMessageBatchInput{
QueueUrl: &ss.queueURL,
Entries: entries,
}
// Sometimes, there will be unexpected exception on SendMessageBatch
// The standard retryer doesn't work
// Add another retry layer - each time 30 seconds
retry := 0
for retry <= 5 {
_, err := ss.client.SendMessageBatch(ctx, input)
if err != nil {
retry++
log.Printf("Failed to send the messages in batch to SQS Queue - %s - Retry %d time(s)\n", err.Error(), retry)
time.Sleep(time.Second * 30)
} else {
return
}
}
// log.Printf("Sent %d messages successfully\n", len(resp.Successful))
}