func()

in firehose/firehose.go [328:365]


func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatchOutput) (int, error) {
	if aws.Int64Value(response.FailedPutCount) > 0 {
		// start timer if all records failed (no progress has been made)
		if aws.Int64Value(response.FailedPutCount) == int64(len(output.records)) {
			output.timer.Start()
			return fluentbit.FLB_RETRY, fmt.Errorf("PutRecordBatch request returned with no records successfully recieved")
		}

		logrus.Warnf("[firehose %d] %d records failed to be delivered. Will retry.\n", output.PluginID, aws.Int64Value(response.FailedPutCount))
		failedRecords := make([]*firehose.Record, 0, aws.Int64Value(response.FailedPutCount))
		// try to resend failed records
		for i, record := range response.RequestResponses {
			if record.ErrorMessage != nil {
				logrus.Debugf("[firehose %d] Record failed to send with error: %s\n", output.PluginID, aws.StringValue(record.ErrorMessage))
				failedRecords = append(failedRecords, output.records[i])
			}
			if aws.StringValue(record.ErrorCode) == firehose.ErrCodeServiceUnavailableException {
				logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID)
				return fluentbit.FLB_RETRY, nil
			}
		}

		output.records = output.records[:0]
		output.records = append(output.records, failedRecords...)
		output.dataLength = 0
		for _, record := range output.records {
			output.dataLength += len(record.Data)
		}

	} else {
		// request fully succeeded
		output.timer.Reset()
		output.records = output.records[:0]
		output.dataLength = 0
	}

	return fluentbit.FLB_OK, nil
}