in src/main/java/com/aliyun/dts/subscribe/clients/DefaultDTSConsumer.java [27:63]
public void start() {
//check firstly
boolean checkResult = check();
if (!checkResult) {
log.error("DTS precheck failed, dts consumer exit.");
throw new CriticalException("DTS precheck failed, dts consumer exit.");
}
synchronized (this) {
initLog4j();
if (started) {
throw new IllegalStateException("The client has already been started");
}
KafkaRecordFetcher recordFetcher = new KafkaRecordFetcher(consumerContext, toProcessRecords);
UserRecordGenerator userRecordGenerator = new UserRecordGenerator(consumerContext, toProcessRecords, defaultUserRecords,
(tp, timestamp, offset, metadata) -> recordFetcher.setToCommitCheckpoint(new Checkpoint(tp, timestamp, offset, metadata)));
//processor
EtlRecordProcessor etlRecordProcessor = new EtlRecordProcessor(consumerContext, defaultUserRecords, recordListeners);
List<WorkThread> startStream = startWorker(etlRecordProcessor, userRecordGenerator, recordFetcher);
while (!consumerContext.isExited()) {
sleepMS(1000);
}
log.info("DTS Consumer: shutting down...");
for (WorkThread workThread : startStream) {
workThread.stop();
}
started = true;
}
}