integrations/flink_connector/src/main/java/com/amazonaws/services/timestream/TimestreamSink.java [87:155]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        bufferedRecords.add(measure);

        if(shouldPublish()) {
            WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest()
                    .withDatabaseName(this.db)
                    .withTableName(this.table)
                    .withRecords(bufferedRecords);

            try {
                WriteRecordsResult writeRecordsResult = this.writeClient.writeRecords(writeRecordsRequest);
                LOG.debug("writeRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode());
                bufferedRecords.clear();
                emptyListTimetamp = System.currentTimeMillis();

            }   catch (RejectedRecordsException e){
                List<RejectedRecord> rejectedRecords = e.getRejectedRecords();
                LOG.warn("Rejected Records -> " + rejectedRecords.size());

                for (int i = rejectedRecords.size()-1 ; i >= 0 ; i-- ) {

                    LOG.warn("Discarding Malformed Record ->" + rejectedRecords.get(i).toString());
                    LOG.warn("Rejected Record Reason ->" + 	rejectedRecords.get(i).getReason());
                    bufferedRecords.remove(rejectedRecords.get(i).getRecordIndex());

                }
            }   catch (Exception e) {
                LOG.error("Error: " + e);
            }
        }
    }

    // Method to validate if record batch should be published.
    // This method would return true if the accumulated records has reached the batch size.
    // Or if records have been accumulated for last RECORDS_FLUSH_INTERVAL_MILLISECONDS time interval.
    private boolean shouldPublish() {
        if (bufferedRecords.size() == batchSize) {
            LOG.debug("Batch of size " + bufferedRecords.size() + " should get published");
            return true;
        } else if(System.currentTimeMillis() - emptyListTimetamp >= RECORDS_FLUSH_INTERVAL_MILLISECONDS) {
            LOG.debug("Records after flush interval should get published");
            return true;
        }
        return false;
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkpointedState.clear();
        for (Record element : bufferedRecords) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        ListStateDescriptor<Record> descriptor =
                new ListStateDescriptor<>("recordList",
                        Record.class);

        checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);

        if (functionInitializationContext.isRestored()) {
            for (Record element : checkpointedState.get()) {
                bufferedRecords.add(element);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



integrations/flink_connector_with_upserts/src/main/java/com/amazonaws/services/timestream/TimestreamSink.java [92:160]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        bufferedRecords.add(measure);

        if(shouldPublish()) {
            WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest()
                    .withDatabaseName(this.db)
                    .withTableName(this.table)
                    .withRecords(bufferedRecords);

            try {
                WriteRecordsResult writeRecordsResult = this.writeClient.writeRecords(writeRecordsRequest);
                LOG.debug("writeRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode());
                bufferedRecords.clear();
                emptyListTimetamp = System.currentTimeMillis();

            }   catch (RejectedRecordsException e){
                List<RejectedRecord> rejectedRecords = e.getRejectedRecords();
                LOG.warn("Rejected Records -> " + rejectedRecords.size());

                for (int i = rejectedRecords.size()-1 ; i >= 0 ; i-- ) {

                    LOG.warn("Discarding Malformed Record ->" + rejectedRecords.get(i).toString());
                    LOG.warn("Rejected Record Reason ->" + 	rejectedRecords.get(i).getReason());
                    bufferedRecords.remove(rejectedRecords.get(i).getRecordIndex());

                }
            }   catch (Exception e) {
                LOG.error("Error: " + e);
            }
        }
    }

    // Method to validate if record batch should be published.
    // This method would return true if the accumulated records has reached the batch size.
    // Or if records have been accumulated for last RECORDS_FLUSH_INTERVAL_MILLISECONDS time interval.
    private boolean shouldPublish() {
        if (bufferedRecords.size() == batchSize) {
            LOG.debug("Batch of size " + bufferedRecords.size() + " should get published");
            return true;
        } else if(System.currentTimeMillis() - emptyListTimetamp >= RECORDS_FLUSH_INTERVAL_MILLISECONDS) {
            LOG.debug("Records after flush interval should get published");
            return true;
        }
        return false;
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkpointedState.clear();
        for (Record element : bufferedRecords) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        ListStateDescriptor<Record> descriptor =
                new ListStateDescriptor<>("recordList",
                        Record.class);

        checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(descriptor);

        if (functionInitializationContext.isRestored()) {
            for (Record element : checkpointedState.get()) {
                bufferedRecords.add(element);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



