public Dag compileFlow()

in gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java [211:290]


  public Dag<JobExecutionPlan> compileFlow(Spec spec) {
    Preconditions.checkNotNull(spec);
    Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs");
    FlowGraph graph = this.flowGraph.get();
    long startTime = System.nanoTime();

    FlowSpec flowSpec = (FlowSpec) spec;
    String source = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, this.dataNodeAliasMap);
    String destination = FlowConfigUtils.getDataNode(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);

    DataNode sourceNode = graph.getNode(source);
    if (sourceNode == null) {
      flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", source));
      return null;
    }
    List<String> destNodeIds = FlowConfigUtils.getDataNodes(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, this.dataNodeAliasMap);
    List<DataNode> destNodes = destNodeIds.stream().map(graph::getNode).collect(Collectors.toList());
    if (destNodes.contains(null)) {
      flowSpec.addCompilationError(source, destination, String.format("Flowgraph does not have a node with id %s", destNodeIds.get(destNodes.indexOf(null))));
      return null;
    }
    log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination));

    List<FlowSpec> flowSpecs = splitFlowSpec(flowSpec);
    Dag<JobExecutionPlan> jobExecutionPlanDag = new Dag<>(new ArrayList<>());
    try {
      this.rwLock.readLock().lock();
      for (FlowSpec datasetFlowSpec : flowSpecs) {
        for (DataNode destNode : destNodes) {
          long authStartTime = System.nanoTime();
          try {
            boolean authorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode, destNode);
            Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() - authStartTime, TimeUnit.NANOSECONDS);
            if (!authorized) {
              String message = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
                  flowSpec.getUri().toString(), source, destination);
              log.error(message);
              datasetFlowSpec.addCompilationError(source, destination, message);
              return null;
            }
          } catch (Exception e) {
            Instrumented.markMeter(flowCompilationFailedMeter);
            datasetFlowSpec.addCompilationError(source, destination, Throwables.getStackTraceAsString(e));
            return null;
          }
        }

        //Compute the path from source to destination.
        FlowGraphPath flowGraphPath = graph.findPath(datasetFlowSpec);
        if (flowGraphPath != null) {
          //Convert the path into a Dag of JobExecutionPlans.
          jobExecutionPlanDag = jobExecutionPlanDag.merge(flowGraphPath.asDag(this.config));
        }
      }

      if (jobExecutionPlanDag.isEmpty()) {
        Instrumented.markMeter(flowCompilationFailedMeter);
        String message = String.format("No path found from source: %s and destination: %s", source, destination);
        log.info(message);

        if (!flowSpec.getCompilationErrors().stream().anyMatch(compilationError -> compilationError.errorPriority == 0)) {
          flowSpec.addCompilationError(source, destination, message);
        }
        return null;
      }
    } catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | ReflectiveOperationException e) {
      Instrumented.markMeter(flowCompilationFailedMeter);
      String message = String.format("Exception encountered while compiling flow for source: %s and destination: %s, %s", source, destination, Throwables.getStackTraceAsString(e));
      log.error(message, e);
      flowSpec.addCompilationError(source, destination, message);
      return null;
    } finally {
      this.rwLock.readLock().unlock();
    }
    Instrumented.markMeter(flowCompilationSuccessFulMeter);
    Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
    // Clear compilation errors now that compilation is successful
    flowSpec.clearCompilationErrors();
    return jobExecutionPlanDag;
  }