public Task()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java [182:276]


  public Task(TaskContext context, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor,
      Optional<CountDownLatch> countDownLatch) {
    this.taskContext = context;
    this.taskState = context.getTaskState();
    this.jobId = this.taskState.getJobId();
    this.taskId = this.taskState.getTaskId();
    this.taskKey = this.taskState.getTaskKey();
    this.isIgnoreCloseFailures = this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_IGNORE_CLOSE_FAILURES, false);
    this.shouldInterruptTaskOnCancel = this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_INTERRUPT_ON_CANCEL, true);
    this.taskStateTracker = taskStateTracker;
    this.taskExecutor = taskExecutor;
    this.countDownLatch = countDownLatch;
    this.closer = Closer.create();
    this.closer.register(this.taskState.getTaskBrokerNullable());
    this.extractor =
        closer.register(new InstrumentedExtractorDecorator<>(this.taskState, this.taskContext.getExtractor()));

    this.recordStreamProcessors = this.taskContext.getRecordStreamProcessors();

    // add record stream processors to closer if they are closeable
    for (RecordStreamProcessor r: recordStreamProcessors) {
      if (r instanceof Closeable) {
        this.closer.register((Closeable)r);
      }
    }

    List<Converter<?,?,?,?>> converters = this.taskContext.getConverters();

    this.converter = closer.register(new MultiConverter(converters));

    // can't have both record stream processors and converter lists configured
    try {
      Preconditions.checkState(this.recordStreamProcessors.isEmpty() || converters.isEmpty(),
          "Converters cannot be specified when RecordStreamProcessors are specified");
    } catch (IllegalStateException e) {
      try {
        closer.close();
      } catch (Throwable t) {
        LOG.error("Failed to close all open resources", t);
      }
      throw new TaskInstantiationException("Converters cannot be specified when RecordStreamProcessors are specified");
    }

    try {
      this.rowChecker = closer.register(this.taskContext.getRowLevelPolicyChecker());
    } catch (Exception e) {
      try {
        closer.close();
      } catch (Throwable t) {
        LOG.error("Failed to close all open resources", t);
      }
      throw new RuntimeException("Failed to instantiate row checker.", e);
    }

    this.taskMode = getExecutionModel(this.taskState);
    this.recordsPulled = new AtomicLong(0);
    this.lastRecordPulledTimestampMillis = 0;
    this.shutdownRequested = new AtomicBoolean(false);
    this.shutdownLatch = new CountDownLatch(1);

    // Setup Streaming constructs
    if (isStreamingTask()) {
      Extractor underlyingExtractor = this.taskContext.getRawSourceExtractor();
      if (!(underlyingExtractor instanceof StreamingExtractor)) {
        LOG.error(
            "Extractor {}  is not an instance of StreamingExtractor but the task is configured to run in continuous mode",
            underlyingExtractor.getClass().getName());
        throw new TaskInstantiationException("Extraction " + underlyingExtractor.getClass().getName()
            + " is not an instance of StreamingExtractor but the task is configured to run in continuous mode");
      }

      this.watermarkStorage = Optional.of(taskContext.getWatermarkStorage());
      Config config;
      try {
        config = ConfigUtils.propertiesToConfig(taskState.getProperties());
      } catch (Exception e) {
        LOG.warn("Failed to deserialize taskState into Config.. continuing with an empty config", e);
        config = ConfigFactory.empty();
      }

      long commitIntervalMillis = ConfigUtils.getLong(config,
          TaskConfigurationKeys.STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS,
          TaskConfigurationKeys.DEFAULT_STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS);
      this.watermarkTracker = Optional.of(this.closer.register(new FineGrainedWatermarkTracker(config)));
      this.watermarkManager = Optional.of((WatermarkManager) this.closer.register(
          new TrackerBasedWatermarkManager(this.watermarkStorage.get(), this.watermarkTracker.get(),
              commitIntervalMillis, Optional.of(this.LOG))));

    } else {
      this.watermarkManager = Optional.absent();
      this.watermarkTracker = Optional.absent();
      this.watermarkStorage = Optional.absent();
    }
    this.taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(taskState);
  }