private Boolean sendRecord()

in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java [230:305]


    private Boolean sendRecord() throws InterruptedException {
        int processed = 0;

        final CalcSourceRecordWrite counter = new CalcSourceRecordWrite(toSendRecord.size(), sourceTaskMetricsGroup);
        for (ConnectRecord preTransformRecord : toSendRecord) {
            retryWithToleranceOperator.sourceRecord(preTransformRecord);
            ConnectRecord record = transformChain.doTransforms(preTransformRecord);
            String topic = maybeCreateAndGetTopic(record);
            Message sourceMessage = convertTransformedRecord(topic, record);
            if (sourceMessage == null || retryWithToleranceOperator.failed()) {
                // commit record
                recordFailed(preTransformRecord);
                counter.skipRecord();
                continue;
            }
            log.trace("{} Appending record to the topic {} , value {}", this, topic, record.getData());
            /**prepare to send record*/
            Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToSendRecord(preTransformRecord);
            try {

                SendCallback callback = new SendCallback() {
                    @Override
                    public void onSuccess(SendResult result) {
                        log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
                        // complete record
                        counter.completeRecord();
                        // commit record for custom
                        recordSent(preTransformRecord, sourceMessage, result);
                        // ack record position
                        submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
                    }

                    @Override
                    public void onException(Throwable throwable) {

                        log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable);
                        // skip record
                        counter.skipRecord();
                        // record send failed
                        recordSendFailed(false, sourceMessage, preTransformRecord, throwable);
                    }
                };

                if (StringUtils.isEmpty(sourceMessage.getKeys())) {
                    // Round robin
                    producer.send(sourceMessage, callback);
                } else {
                    // Partition message ordering,
                    // At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business
                    producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
                }

            } catch (RetriableException e) {
                log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ",
                    this, sourceMessage.getTopic(), e);
                // Intercepted as successfully sent, used to continue sending next time
                toSendRecord = toSendRecord.subList(processed, toSendRecord.size());
                // remove pre submit position, for retry
                submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::remove);
                // retry metrics
                counter.retryRemaining();
                return false;
            } catch (InterruptedException e) {
                log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e);
                // throw e and stop task
                throw e;
            } catch (Exception e) {
                log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e);
                recordSendFailed(true, sourceMessage, preTransformRecord, e);
            }

            processed++;
        }
        toSendRecord = null;
        return true;
    }