in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java [182:276]
public Task(TaskContext context, TaskStateTracker taskStateTracker, TaskExecutor taskExecutor,
Optional<CountDownLatch> countDownLatch) {
this.taskContext = context;
this.taskState = context.getTaskState();
this.jobId = this.taskState.getJobId();
this.taskId = this.taskState.getTaskId();
this.taskKey = this.taskState.getTaskKey();
this.isIgnoreCloseFailures = this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_IGNORE_CLOSE_FAILURES, false);
this.shouldInterruptTaskOnCancel = this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_INTERRUPT_ON_CANCEL, true);
this.taskStateTracker = taskStateTracker;
this.taskExecutor = taskExecutor;
this.countDownLatch = countDownLatch;
this.closer = Closer.create();
this.closer.register(this.taskState.getTaskBrokerNullable());
this.extractor =
closer.register(new InstrumentedExtractorDecorator<>(this.taskState, this.taskContext.getExtractor()));
this.recordStreamProcessors = this.taskContext.getRecordStreamProcessors();
// add record stream processors to closer if they are closeable
for (RecordStreamProcessor r: recordStreamProcessors) {
if (r instanceof Closeable) {
this.closer.register((Closeable)r);
}
}
List<Converter<?,?,?,?>> converters = this.taskContext.getConverters();
this.converter = closer.register(new MultiConverter(converters));
// can't have both record stream processors and converter lists configured
try {
Preconditions.checkState(this.recordStreamProcessors.isEmpty() || converters.isEmpty(),
"Converters cannot be specified when RecordStreamProcessors are specified");
} catch (IllegalStateException e) {
try {
closer.close();
} catch (Throwable t) {
LOG.error("Failed to close all open resources", t);
}
throw new TaskInstantiationException("Converters cannot be specified when RecordStreamProcessors are specified");
}
try {
this.rowChecker = closer.register(this.taskContext.getRowLevelPolicyChecker());
} catch (Exception e) {
try {
closer.close();
} catch (Throwable t) {
LOG.error("Failed to close all open resources", t);
}
throw new RuntimeException("Failed to instantiate row checker.", e);
}
this.taskMode = getExecutionModel(this.taskState);
this.recordsPulled = new AtomicLong(0);
this.lastRecordPulledTimestampMillis = 0;
this.shutdownRequested = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
// Setup Streaming constructs
if (isStreamingTask()) {
Extractor underlyingExtractor = this.taskContext.getRawSourceExtractor();
if (!(underlyingExtractor instanceof StreamingExtractor)) {
LOG.error(
"Extractor {} is not an instance of StreamingExtractor but the task is configured to run in continuous mode",
underlyingExtractor.getClass().getName());
throw new TaskInstantiationException("Extraction " + underlyingExtractor.getClass().getName()
+ " is not an instance of StreamingExtractor but the task is configured to run in continuous mode");
}
this.watermarkStorage = Optional.of(taskContext.getWatermarkStorage());
Config config;
try {
config = ConfigUtils.propertiesToConfig(taskState.getProperties());
} catch (Exception e) {
LOG.warn("Failed to deserialize taskState into Config.. continuing with an empty config", e);
config = ConfigFactory.empty();
}
long commitIntervalMillis = ConfigUtils.getLong(config,
TaskConfigurationKeys.STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS,
TaskConfigurationKeys.DEFAULT_STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS);
this.watermarkTracker = Optional.of(this.closer.register(new FineGrainedWatermarkTracker(config)));
this.watermarkManager = Optional.of((WatermarkManager) this.closer.register(
new TrackerBasedWatermarkManager(this.watermarkStorage.get(), this.watermarkTracker.get(),
commitIntervalMillis, Optional.of(this.LOG))));
} else {
this.watermarkManager = Optional.absent();
this.watermarkTracker = Optional.absent();
this.watermarkStorage = Optional.absent();
}
this.taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(taskState);
}