public void invoke()

in LambdaSink/src/main/java/com/amazonaws/services/kinesisanalytics/AwsLambdaSink.java [126:182]


    public void invoke(T value, Context context) throws Exception {
        // Ensure all records are under max lambda payload size
        byte[] valueAsBytes = jsonParser.writeValueAsBytes(value); // All json parsing exceptions will be thrown from here early
        if (valueAsBytes.length > MAX_PAYLOAD_BYTES) {
            if (skipBadRecords) {
                LOG.warn("Skipping record with MD5 hash " + DigestUtils.md5Hex(valueAsBytes) + " as it exceeds max allowed lambda function payload size.");
                return;
            } else
                throw new RuntimeException("Record with MD5 hash " + DigestUtils.md5Hex(valueAsBytes) + " exceeds max allowed lambda function payload size.");
        }

        // Add new received record to the buffer
        bufferedRecords.add(value);
        this.buffedBytes += valueAsBytes.length;

        if (shouldPublish()) {
            List<List<T>> batches = new ArrayList<>();
            int currentBatchIndex = 0;
            int recordsInCurrentBatch = 0;
            long bytesInCurrentBatch = 0;
            batches.add(new ArrayList<>());
            for (T bufferedRecord : bufferedRecords) {
                String record = jsonParser.writeValueAsString(bufferedRecord);
                recordsInCurrentBatch++;
                bytesInCurrentBatch += record.getBytes().length;

                if (recordsInCurrentBatch > maxRecordsPerFunctionCall
                        || bytesInCurrentBatch > (MAX_PAYLOAD_BYTES - (bufferedRecords.size() * 2L) - 4)
                    // current batch will be converted as array which adds 4 bytes for bracket & 2 bytes for each comma
                    // {rec1} = 20 bytes, {rec2} = 40 bytes will be converted to
                    // [{rec1},{rec2}] here array square bracket adds 2 character & one comma per record which all occupies 2 bytes each
                ) {
                    batches.add(++currentBatchIndex, new ArrayList<>());
                    recordsInCurrentBatch = 1;
                    bytesInCurrentBatch = record.getBytes().length;
                }
                batches.get(currentBatchIndex).add(bufferedRecord);
            }
            LOG.info("Flushing " + batches.size() + " buffered batches lastPublishTime: " + lastPublishTime + ", bufferedRecords: " + bufferedRecords.size() + ", buffedBytes: " + buffedBytes);

            batches.parallelStream().forEach(batch -> {
                try {
                    awsLambdaAsync.invoke(new InvokeRequest()
                            .withFunctionName(functionName)
                            .withInvocationType(InvocationType.Event)
                            .withPayload(jsonParser.writeValueAsString(batch)));
                } catch (JsonProcessingException e) {
                    // Ignore, This is unreachable
                }
            });

            // Reset all once published
            bufferedRecords.clear();
            this.buffedBytes = 0;
            this.lastPublishTime = System.currentTimeMillis();
        }
    }