private void runSynchronousModel()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java [420:546]


  private void runSynchronousModel() throws Exception {
    // Get the fork operator. By default IdentityForkOperator is used with a single branch.
    ForkOperator forkOperator = closer.register(this.taskContext.getForkOperator());
    forkOperator.init(this.taskState);
    int branches = forkOperator.getBranches(this.taskState);
    // Set fork.branches explicitly here so the rest task flow can pick it up
    this.taskState.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, branches);

    // Extract, convert, and fork the source schema.
    Object schema = converter.convertSchema(extractor.getSchema(), this.taskState);
    List<Boolean> forkedSchemas = forkOperator.forkSchema(this.taskState, schema);
    if (forkedSchemas.size() != branches) {
      throw new ForkBranchMismatchException(String
          .format("Number of forked schemas [%d] is not equal to number of branches [%d]", forkedSchemas.size(),
              branches));
    }

    if (inMultipleBranches(forkedSchemas) && !(CopyHelper.isCopyable(schema))) {
      throw new CopyNotSupportedException(schema + " is not copyable");
    }

    RowLevelPolicyCheckResults rowResults = new RowLevelPolicyCheckResults();

    if (!areSingleBranchTasksSynchronous(this.taskContext) || branches > 1) {
      // Create one fork for each forked branch
      for (int i = 0; i < branches; i++) {
        if (forkedSchemas.get(i)) {
          AsynchronousFork fork = closer.register(
              new AsynchronousFork(this.taskContext, schema instanceof Copyable ? ((Copyable) schema).copy() : schema,
                  branches, i, this.taskMode));
          configureStreamingFork(fork);
          // Run the Fork
          this.forks.put(Optional.<Fork>of(fork), Optional.<Future<?>>of(this.taskExecutor.submit(fork)));
        } else {
          this.forks.put(Optional.<Fork>absent(), Optional.<Future<?>> absent());
        }
      }
    } else {
      SynchronousFork fork = closer.register(
          new SynchronousFork(this.taskContext, schema instanceof Copyable ? ((Copyable) schema).copy() : schema,
              branches, 0, this.taskMode));
      configureStreamingFork(fork);
      this.forks.put(Optional.<Fork>of(fork), Optional.<Future<?>> of(this.taskExecutor.submit(fork)));
    }

    LOG.info("Task mode streaming = " + isStreamingTask());
    if (isStreamingTask()) {

      // Start watermark manager and tracker
      if (this.watermarkTracker.isPresent()) {
        this.watermarkTracker.get().start();
      }
      this.watermarkManager.get().start();

      ((StreamingExtractor) this.taskContext.getRawSourceExtractor()).start(this.watermarkStorage.get());


      RecordEnvelope recordEnvelope;
      // Extract, convert, and fork one source record at a time.
      while ((recordEnvelope = extractor.readRecordEnvelope()) != null) {
        onRecordExtract();
        AcknowledgableWatermark ackableWatermark = new AcknowledgableWatermark(recordEnvelope.getWatermark());
        if (watermarkTracker.isPresent()) {
          watermarkTracker.get().track(ackableWatermark);
        }
        for (Object convertedRecord : converter.convertRecord(schema, recordEnvelope, this.taskState)) {
          processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches,
              ackableWatermark.incrementAck());
        }
        ackableWatermark.ack();
        if (shutdownRequested()) {
          extractor.shutdown();
        }
      }
    } else {
      RecordEnvelope record;
      // Extract, convert, and fork one source record at a time.
      long errRecords = 0;
      while ((record = extractor.readRecordEnvelope()) != null) {
        onRecordExtract();
        try {
          for (Object convertedRecord : converter.convertRecord(schema, record.getRecord(), this.taskState)) {
            processRecord(convertedRecord, forkOperator, rowChecker, rowResults, branches, null);
          }
        } catch (Exception e) {
          if (!(e instanceof DataConversionException) && !(e.getCause() instanceof DataConversionException)) {
            LOG.error("Processing record incurs an unexpected exception: ", e);
            throw new RuntimeException(e.getCause());
          }
          errRecords++;
          if (errRecords > this.taskState.getPropAsLong(TaskConfigurationKeys.TASK_SKIP_ERROR_RECORDS,
              TaskConfigurationKeys.DEFAULT_TASK_SKIP_ERROR_RECORDS)) {
            throw new RuntimeException(e);
          }
        }
        if (shutdownRequested()) {
          extractor.shutdown();
        }
      }
    }

    LOG.info("Extracted " + this.recordsPulled + " data records");
    LOG.info("Row quality checker finished with results: " + rowResults.getResults());

    this.taskState.setProp(ConfigurationKeys.EXTRACTOR_ROWS_EXTRACTED, this.recordsPulled);
    this.taskState.setProp(ConfigurationKeys.EXTRACTOR_ROWS_EXPECTED, extractor.getExpectedRecordCount());

    for (Optional<Fork> fork : this.forks.keySet()) {
      if (fork.isPresent()) {
        // Tell the fork that the main branch is completed and no new incoming data records should be expected
        fork.get().markParentTaskDone();
      }
    }

    for (Optional<Future<?>> forkFuture : this.forks.values()) {
      if (forkFuture.isPresent()) {
        try {
          long forkFutureStartTime = System.nanoTime();
          forkFuture.get().get();
          long forkDuration = System.nanoTime() - forkFutureStartTime;
          LOG.info("Task shutdown: Fork future reaped in {} millis", forkDuration / 1000000);
        } catch (InterruptedException ie) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }