in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java [230:305]
private Boolean sendRecord() throws InterruptedException {
int processed = 0;
final CalcSourceRecordWrite counter = new CalcSourceRecordWrite(toSendRecord.size(), sourceTaskMetricsGroup);
for (ConnectRecord preTransformRecord : toSendRecord) {
retryWithToleranceOperator.sourceRecord(preTransformRecord);
ConnectRecord record = transformChain.doTransforms(preTransformRecord);
String topic = maybeCreateAndGetTopic(record);
Message sourceMessage = convertTransformedRecord(topic, record);
if (sourceMessage == null || retryWithToleranceOperator.failed()) {
// commit record
recordFailed(preTransformRecord);
counter.skipRecord();
continue;
}
log.trace("{} Appending record to the topic {} , value {}", this, topic, record.getData());
/**prepare to send record*/
Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToSendRecord(preTransformRecord);
try {
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult result) {
log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
// complete record
counter.completeRecord();
// commit record for custom
recordSent(preTransformRecord, sourceMessage, result);
// ack record position
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
}
@Override
public void onException(Throwable throwable) {
log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable);
// skip record
counter.skipRecord();
// record send failed
recordSendFailed(false, sourceMessage, preTransformRecord, throwable);
}
};
if (StringUtils.isEmpty(sourceMessage.getKeys())) {
// Round robin
producer.send(sourceMessage, callback);
} else {
// Partition message ordering,
// At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business
producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
}
} catch (RetriableException e) {
log.warn("{} Failed to send record to topic '{}'. Backing off before retrying: ",
this, sourceMessage.getTopic(), e);
// Intercepted as successfully sent, used to continue sending next time
toSendRecord = toSendRecord.subList(processed, toSendRecord.size());
// remove pre submit position, for retry
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::remove);
// retry metrics
counter.retryRemaining();
return false;
} catch (InterruptedException e) {
log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e);
// throw e and stop task
throw e;
} catch (Exception e) {
log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e);
recordSendFailed(true, sourceMessage, preTransformRecord, e);
}
processed++;
}
toSendRecord = null;
return true;
}