public void startTask()

in src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java [102:126]


    public void startTask(final String tableName, final TopicPartition topicPartition) {
        // check if the task is already started
        String writerKey =
                getWriterKey(topicPartition.topic(), topicPartition.partition(), tableName);
        if (writer.containsKey(writerKey)) {
            LOG.info("already start task");
        } else {
            String topic = topicPartition.topic();
            int partition = topicPartition.partition();
            LoadModel loadModel = dorisOptions.getLoadModel();
            DorisWriter dorisWriter =
                    LoadModel.COPY_INTO.equals(loadModel)
                            ? new CopyIntoWriter(
                                    tableName, topic, partition, dorisOptions, conn, connectMonitor)
                            : new StreamLoadWriter(
                                    tableName,
                                    topic,
                                    partition,
                                    dorisOptions,
                                    conn,
                                    connectMonitor);
            writer.put(writerKey, dorisWriter);
            metricsJmxReporter.start();
        }
    }