in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java [59:114]
public void createInternal() throws Exception {
final PluginRuntime pluginRuntime =
RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null;
if (pluginRuntime == null) {
throw new IllegalStateException("Plugin runtime is down");
}
REGISTERED_EXECUTOR_SERVICES.putIfAbsent(
taskId,
new ThreadPoolExecutor(
parallelism,
parallelism,
0L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(parallelism))); // TODO: thread name
final long creationTime = System.currentTimeMillis();
consumers = new PullSourceConsumer[parallelism];
for (int i = 0; i < parallelism; i++) {
consumers[i] =
new PullSourceConsumer(
(PullSource) pluginRuntime.constructSource(parameters), processorProducer);
try {
consumers[i].consumer().validate(new PipeParameterValidator(parameters));
consumers[i]
.consumer()
.customize(
parameters,
new CollectorSourceRuntimeConfiguration(taskId, creationTime, parallelism, i));
consumers[i].consumer().start();
} catch (final Exception e) {
try {
consumers[i].consumer().close();
} catch (final Exception ex) {
LOGGER.warn("Failed to close source on creation failure", ex);
throw e;
}
}
int finalI = i;
REGISTERED_EXECUTOR_SERVICES
.get(taskId)
.submit(
() -> {
while (dispatch.isRunning() && TaskStateEnum.RUNNING.equals(taskState)) {
try {
consumers[finalI].onScheduler();
} catch (final Exception e) {
LOGGER.warn("Failed to pull source", e);
}
dispatch.waitUntilRunningOrDropped();
}
});
}
}