in src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java [102:126]
public void startTask(final String tableName, final TopicPartition topicPartition) {
// check if the task is already started
String writerKey =
getWriterKey(topicPartition.topic(), topicPartition.partition(), tableName);
if (writer.containsKey(writerKey)) {
LOG.info("already start task");
} else {
String topic = topicPartition.topic();
int partition = topicPartition.partition();
LoadModel loadModel = dorisOptions.getLoadModel();
DorisWriter dorisWriter =
LoadModel.COPY_INTO.equals(loadModel)
? new CopyIntoWriter(
tableName, topic, partition, dorisOptions, conn, connectMonitor)
: new StreamLoadWriter(
tableName,
topic,
partition,
dorisOptions,
conn,
connectMonitor);
writer.put(writerKey, dorisWriter);
metricsJmxReporter.start();
}
}