in LambdaSink/src/main/java/com/amazonaws/services/kinesisanalytics/AwsLambdaSink.java [126:182]
public void invoke(T value, Context context) throws Exception {
// Ensure all records are under max lambda payload size
byte[] valueAsBytes = jsonParser.writeValueAsBytes(value); // All json parsing exceptions will be thrown from here early
if (valueAsBytes.length > MAX_PAYLOAD_BYTES) {
if (skipBadRecords) {
LOG.warn("Skipping record with MD5 hash " + DigestUtils.md5Hex(valueAsBytes) + " as it exceeds max allowed lambda function payload size.");
return;
} else
throw new RuntimeException("Record with MD5 hash " + DigestUtils.md5Hex(valueAsBytes) + " exceeds max allowed lambda function payload size.");
}
// Add new received record to the buffer
bufferedRecords.add(value);
this.buffedBytes += valueAsBytes.length;
if (shouldPublish()) {
List<List<T>> batches = new ArrayList<>();
int currentBatchIndex = 0;
int recordsInCurrentBatch = 0;
long bytesInCurrentBatch = 0;
batches.add(new ArrayList<>());
for (T bufferedRecord : bufferedRecords) {
String record = jsonParser.writeValueAsString(bufferedRecord);
recordsInCurrentBatch++;
bytesInCurrentBatch += record.getBytes().length;
if (recordsInCurrentBatch > maxRecordsPerFunctionCall
|| bytesInCurrentBatch > (MAX_PAYLOAD_BYTES - (bufferedRecords.size() * 2L) - 4)
// current batch will be converted as array which adds 4 bytes for bracket & 2 bytes for each comma
// {rec1} = 20 bytes, {rec2} = 40 bytes will be converted to
// [{rec1},{rec2}] here array square bracket adds 2 character & one comma per record which all occupies 2 bytes each
) {
batches.add(++currentBatchIndex, new ArrayList<>());
recordsInCurrentBatch = 1;
bytesInCurrentBatch = record.getBytes().length;
}
batches.get(currentBatchIndex).add(bufferedRecord);
}
LOG.info("Flushing " + batches.size() + " buffered batches lastPublishTime: " + lastPublishTime + ", bufferedRecords: " + bufferedRecords.size() + ", buffedBytes: " + buffedBytes);
batches.parallelStream().forEach(batch -> {
try {
awsLambdaAsync.invoke(new InvokeRequest()
.withFunctionName(functionName)
.withInvocationType(InvocationType.Event)
.withPayload(jsonParser.writeValueAsString(batch)));
} catch (JsonProcessingException e) {
// Ignore, This is unreachable
}
});
// Reset all once published
bufferedRecords.clear();
this.buffedBytes = 0;
this.lastPublishTime = System.currentTimeMillis();
}
}