public List getWorkunits()

in gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java [88:219]


  public List<WorkUnit> getWorkunits(SourceState state) {
    initLogger(state);
    lineageInfo = LineageInfo.getLineageInfo(state.getBroker());

    try {
      initFileSystemHelper(state);
    } catch (FileBasedHelperException e) {
      Throwables.propagate(e);
    }

    log.info("Getting work units");
    String nameSpaceName = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY);
    String entityName = state.getProp(ConfigurationKeys.SOURCE_ENTITY);

    // Override extract table name
    String extractTableName = state.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY);

    // If extract table name is not found then consider entity name as extract table name
    if (Strings.isNullOrEmpty(extractTableName)) {
      extractTableName = entityName;
    }

    TableType tableType = TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase());
    List<WorkUnitState> previousWorkunits = Lists.newArrayList(state.getPreviousWorkUnitStates());
    Set<String> prevFsSnapshot = Sets.newHashSet();

    // Get list of files seen in the previous run
    if (!previousWorkunits.isEmpty()) {
      if (previousWorkunits.get(0).getWorkunit().contains(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT)) {
        prevFsSnapshot.addAll(previousWorkunits.get(0).getWorkunit().getPropAsSet(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT));
      } else if (state.getPropAsBoolean(ConfigurationKeys.SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED,
            ConfigurationKeys.DEFAULT_SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED)) {
        // If a previous job exists, there should be a snapshot property.  If not, we need to fail so that we
        // don't accidentally read files that have already been processed.
        throw new RuntimeException(String.format("No '%s' found on state of prior job",
            ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT));
      }
    }

    List<WorkUnit> workUnits = Lists.newArrayList();
    List<WorkUnit> previousWorkUnitsForRetry = this.getPreviousWorkUnitsForRetry(state);
    log.info("Total number of work units from the previous failed runs: " + previousWorkUnitsForRetry.size());
    for (WorkUnit previousWorkUnitForRetry : previousWorkUnitsForRetry) {
      prevFsSnapshot.addAll(previousWorkUnitForRetry.getPropAsSet(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
      workUnits.add(previousWorkUnitForRetry);
    }

    // Get list of files that need to be pulled
    List<String> currentFsSnapshot = this.getcurrentFsSnapshot(state);
    // The snapshot we want to save. This might not be the full snapshot if we don't pull all files.
    List<String> effectiveSnapshot = Lists.newArrayList();
    List<String> filesToPull = Lists.newArrayList();

    int maxFilesToPull = state.getPropAsInt(ConfigurationKeys.SOURCE_FILEBASED_MAX_FILES_PER_RUN, Integer.MAX_VALUE);
    int filesSelectedForPull = 0;
    if (currentFsSnapshot.size() > maxFilesToPull) {
      // if we're going to not pull all files, sort them lexicographically so there is some order in which they are ingested
      // note currentFsSnapshot.size > maxFilesToPull does not imply we will ignore some of them, as we still have to diff
      // against the previous snapshot. Just a quick check if it even makes sense to sort the files.
      Collections.sort(currentFsSnapshot);
    }
    for (String file: currentFsSnapshot) {
      if (prevFsSnapshot.contains(file)) {
        effectiveSnapshot.add(file);
      } else if ((filesSelectedForPull++) < maxFilesToPull) {
        filesToPull.add(file.split(this.splitPattern)[0]);
        effectiveSnapshot.add(file);
      } else {
        // file is not pulled this run
      }
    }
    // Update the snapshot from the previous run with the new files processed in this run
    // Otherwise a corrupt file could cause re-processing of already processed files
    for (WorkUnit workUnit : previousWorkUnitsForRetry) {
      workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT, StringUtils.join(effectiveSnapshot, ","));
    }

    if (!filesToPull.isEmpty()) {
      logFilesToPull(filesToPull);

      int numPartitions = state.contains(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS)
          && state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS) <= filesToPull.size()
              ? state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS) : filesToPull.size();
      if (numPartitions <= 0) {
        throw new IllegalArgumentException("The number of partitions should be positive");
      }

      int filesPerPartition = filesToPull.size() % numPartitions == 0 ? filesToPull.size() / numPartitions
          : filesToPull.size() / numPartitions + 1;

      // Distribute the files across the workunits
      for (int fileOffset = 0; fileOffset < filesToPull.size(); fileOffset += filesPerPartition) {
        /* Use extract table name to create extract
         *
         * We don't want to pass in the whole SourceState object just to avoid any side effect, because
         * the constructor with state argument has been deprecated for a long time. Here we selectively
         * chose the configuration needed for Extract constructor, to manually form a source state.
         */
        SourceState extractState = new SourceState();
        extractState.setProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE,
                state.getProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE, ConfigurationKeys.DEFAULT_EXTRACT_ID_TIME_ZONE));
        extractState.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY,
                state.getProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, ConfigurationKeys.DEFAULT_EXTRACT_IS_FULL));
        Extract extract = new Extract(extractState, tableType, nameSpaceName, extractTableName);

        WorkUnit workUnit = WorkUnit.create(extract);

        // Eventually these setters should be integrated with framework support for generalized watermark handling
        workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT,
            StringUtils.join(effectiveSnapshot, ","));

        List<String> partitionFilesToPull = filesToPull.subList(fileOffset,
            fileOffset + filesPerPartition > filesToPull.size() ? filesToPull.size() : fileOffset + filesPerPartition);
        workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
            StringUtils.join(partitionFilesToPull, ","));
        if (state.getPropAsBoolean(ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, false)) {
          if (partitionFilesToPull.size() != 1) {
            throw new RuntimeException("Cannot preserve the file name if a workunit is given multiple files");
          }
          workUnit.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
              workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
        }

        workUnits.add(workUnit);
      }

      log.info("Total number of work units for the current run: " + (workUnits.size() - previousWorkUnitsForRetry.size()));
    }

    addLineageSourceInfo(workUnits, state);
    return workUnits;
  }