func()

in kinesis/kinesis.go [519:563]


func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) {

	var retCode int = fluentbit.FLB_OK
	var limitsExceeded bool

	if aws.Int64Value(response.FailedRecordCount) > 0 {
		// start timer if all records failed (no progress has been made)
		if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) {
			outputPlugin.timer.Start()
			return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved")
		}

		logrus.Warnf("[kinesis %d] %d/%d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount), len(*records))
		failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount))
		// try to resend failed records
		for i, record := range response.Records {
			if record.ErrorMessage != nil {
				logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage))
				failedRecords = append(failedRecords, (*records)[i])
			}

			if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException {
				retCode = fluentbit.FLB_RETRY
				limitsExceeded = true
			}
		}

		if limitsExceeded {
			logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID)
		}

		*records = (*records)[:0]
		*records = append(*records, failedRecords...)
		*dataLength = 0
		for _, record := range *records {
			*dataLength += len(record.Data)
		}
	} else {
		// request fully succeeded
		outputPlugin.timer.Reset()
		*records = (*records)[:0]
		*dataLength = 0
	}
	return retCode, nil
}