public void createInternal()

in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java [59:114]


  public void createInternal() throws Exception {
    final PluginRuntime pluginRuntime =
        RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null;
    if (pluginRuntime == null) {
      throw new IllegalStateException("Plugin runtime is down");
    }

    REGISTERED_EXECUTOR_SERVICES.putIfAbsent(
        taskId,
        new ThreadPoolExecutor(
            parallelism,
            parallelism,
            0L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(parallelism))); // TODO: thread name

    final long creationTime = System.currentTimeMillis();
    consumers = new PullSourceConsumer[parallelism];
    for (int i = 0; i < parallelism; i++) {
      consumers[i] =
          new PullSourceConsumer(
              (PullSource) pluginRuntime.constructSource(parameters), processorProducer);
      try {
        consumers[i].consumer().validate(new PipeParameterValidator(parameters));
        consumers[i]
            .consumer()
            .customize(
                parameters,
                new CollectorSourceRuntimeConfiguration(taskId, creationTime, parallelism, i));
        consumers[i].consumer().start();
      } catch (final Exception e) {
        try {
          consumers[i].consumer().close();
        } catch (final Exception ex) {
          LOGGER.warn("Failed to close source on creation failure", ex);
          throw e;
        }
      }

      int finalI = i;
      REGISTERED_EXECUTOR_SERVICES
          .get(taskId)
          .submit(
              () -> {
                while (dispatch.isRunning() && TaskStateEnum.RUNNING.equals(taskState)) {
                  try {
                    consumers[finalI].onScheduler();
                  } catch (final Exception e) {
                    LOGGER.warn("Failed to pull source", e);
                  }

                  dispatch.waitUntilRunningOrDropped();
                }
              });
    }
  }