public List getWorkunits()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java [161:284]


  public List<WorkUnit> getWorkunits(final SourceState state) {

    this.metricContext = Instrumented.getMetricContext(state, CopySource.class);
    this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());

    try {

      DeprecationUtils
          .renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY,
              Lists.newArrayList(MAX_FILES_COPIED_KEY));

      final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state);
      final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
      state.setProp(SlaEventKeys.SOURCE_URI, sourceFs.getUri());
      state.setProp(SlaEventKeys.DESTINATION_URI, targetFs.getUri());

      log.info("Identified source file system at {} and target file system at {}.", sourceFs.getUri(),
          targetFs.getUri());

      long maxSizePerBin = state.getPropAsLong(MAX_SIZE_MULTI_WORKUNITS, 0);
      long maxWorkUnitsPerMultiWorkUnit = state.getPropAsLong(MAX_WORK_UNITS_PER_BIN, 50);
      final long minWorkUnitWeight = Math.max(1, maxSizePerBin / maxWorkUnitsPerMultiWorkUnit);
      final Optional<CopyableFileWatermarkGenerator> watermarkGenerator =
          CopyableFileWatermarkHelper.getCopyableFileWatermarkGenerator(state);
      int maxThreads = state.getPropAsInt(MAX_CONCURRENT_LISTING_SERVICES, DEFAULT_MAX_CONCURRENT_LISTING_SERVICES);

      final CopyConfiguration copyConfiguration = CopyConfiguration.builder(targetFs, state.getProperties()).build();

      this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, CopyConfiguration.COPY_PREFIX).build();
      DatasetsFinder<CopyableDatasetBase> datasetFinder = DatasetUtils
          .instantiateDatasetFinder(state.getProperties(), sourceFs, DEFAULT_DATASET_PROFILE_CLASS_KEY,
              this.eventSubmitter, state);

      IterableDatasetFinder<CopyableDatasetBase> iterableDatasetFinder =
          datasetFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<CopyableDatasetBase>) datasetFinder
              : new IterableDatasetFinderImpl<>(datasetFinder);

      Iterator<CopyableDatasetRequestor> requestorIteratorWithNulls = Iterators
          .transform(iterableDatasetFinder.getDatasetsIterator(),
              new CopyableDatasetRequestor.Factory(targetFs, copyConfiguration, log));
      Iterator<CopyableDatasetRequestor> requestorIterator =
          Iterators.filter(requestorIteratorWithNulls, Predicates.<CopyableDatasetRequestor>notNull());

      final SetMultimap<FileSet<CopyEntity>, WorkUnit> workUnitsMap =
          Multimaps.<FileSet<CopyEntity>, WorkUnit>synchronizedSetMultimap(
              HashMultimap.<FileSet<CopyEntity>, WorkUnit>create());

      RequestAllocator<FileSet<CopyEntity>> allocator = createRequestAllocator(copyConfiguration, maxThreads);
      Iterator<FileSet<CopyEntity>> prioritizedFileSets =
          allocator.allocateRequests(requestorIterator, copyConfiguration.getMaxToCopy());

      //Submit alertable events for unfulfilled requests and fail if all of the allocated requests were rejected due to size
      submitUnfulfilledRequestEvents(allocator);
      failJobIfAllRequestsRejected(allocator, prioritizedFileSets);

      String filesetWuGeneratorAlias = state.getProp(ConfigurationKeys.COPY_SOURCE_FILESET_WU_GENERATOR_CLASS, FileSetWorkUnitGenerator.class.getName());
      boolean shouldWuGeneratorFailureBeFatal = state.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_GENERATOR_FAILURE_IS_FATAL, ConfigurationKeys.DEFAULT_WORK_UNIT_FAST_FAIL_ENABLED);
      Iterator<Callable<Void>> callableIterator =
          Iterators.transform(prioritizedFileSets, new Function<FileSet<CopyEntity>, Callable<Void>>() {
            @Nullable
            @Override
            public Callable<Void> apply(FileSet<CopyEntity> input) {
              try {
                return GobblinConstructorUtils.<FileSetWorkUnitGenerator>invokeLongestConstructor(
                    new ClassAliasResolver(FileSetWorkUnitGenerator.class).resolveClass(filesetWuGeneratorAlias),
                    input.getDataset(), input, state, targetFs, workUnitsMap, watermarkGenerator, minWorkUnitWeight, lineageInfo);
              } catch (Exception e) {
                throw new RuntimeException("Cannot create workunits generator", e);
              }
            }
          });

      try {
        List<Future<Void>> futures = new IteratorExecutor<>(callableIterator, maxThreads,
            ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of("Copy-file-listing-pool-%d")))
            .execute();

        for (Future<Void> future : futures) {
          try {
            future.get();
          } catch (ExecutionException exc) {
            log.error("Failed to get work units for dataset.", exc.getCause());
            if (shouldWuGeneratorFailureBeFatal) {
              throw new RuntimeException("Failed to get work units for dataset.", exc.getCause());
            }
          }
        }
      } catch (InterruptedException ie) {
        log.error("Retrieval of work units was interrupted. Aborting.");
        return Lists.newArrayList();
      }

      log.info(String.format("Created %s workunits ", workUnitsMap.size()));

      copyConfiguration.getCopyContext().logCacheStatistics();

      if (state.contains(SIMULATE) && state.getPropAsBoolean(SIMULATE)) {
        log.info("Simulate mode enabled. Will not execute the copy.");
        for (Map.Entry<FileSet<CopyEntity>, Collection<WorkUnit>> entry : workUnitsMap.asMap().entrySet()) {
          log.info(String.format("Actions for dataset %s file set %s.", entry.getKey().getDataset().datasetURN(),
              entry.getKey().getName()));
          for (WorkUnit workUnit : entry.getValue()) {
            try {
              CopyEntity copyEntity = deserializeCopyEntity(workUnit);
              log.info(copyEntity.explain());
            } catch (Exception e) {
              log.info("Cannot deserialize CopyEntity from wu : {}", workUnit.toString());
            }
          }
        }
        return Lists.newArrayList();
      }

      List<? extends WorkUnit> workUnits = new WorstFitDecreasingBinPacking(maxSizePerBin)
          .pack(Lists.newArrayList(workUnitsMap.values()), this.weighter);
      log.info(String.format(
          "Bin packed work units. Initial work units: %d, packed work units: %d, max weight per bin: %d, "
              + "max work units per bin: %d.", workUnitsMap.size(), workUnits.size(), maxSizePerBin,
          maxWorkUnitsPerMultiWorkUnit));
      return ImmutableList.copyOf(workUnits);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }