public static PipelineResult run()

in v2/dataplex/src/main/java/com/google/cloud/teleport/v2/templates/DataplexFileFormatConversion.java [263:385]


  public static PipelineResult run(
      Pipeline pipeline,
      FileFormatConversionOptions options,
      DataplexClient dataplex,
      OutputPathProvider outputPathProvider)
      throws IOException {
    boolean isInputAsset = ASSET_PATTERN.matcher(options.getInputAssetOrEntitiesList()).matches();
    if (!isInputAsset
        && !ENTITIES_PATTERN.matcher(options.getInputAssetOrEntitiesList()).matches()) {
      throw new IllegalArgumentException(
          "Either input asset or input entities list must be provided");
    }

    GoogleCloudDataplexV1Asset outputAsset = dataplex.getAsset(options.getOutputAsset());
    if (outputAsset == null
        || outputAsset.getResourceSpec() == null
        || !DataplexAssetResourceSpec.STORAGE_BUCKET
            .name()
            .equals(outputAsset.getResourceSpec().getType())
        || outputAsset.getResourceSpec().getName() == null) {
      throw new IllegalArgumentException(
          "Output asset must be an existing asset with resource spec name being a GCS bucket and"
              + " resource spec type of "
              + DataplexAssetResourceSpec.STORAGE_BUCKET.name());
    }
    String outputBucket = outputAsset.getResourceSpec().getName();

    Predicate<String> inputFilesFilter;
    switch (options.getWriteDisposition()) {
      case OVERWRITE:
        inputFilesFilter = inputFilePath -> true;
        break;
      case FAIL:
        Set<String> outputFilePaths = getAllOutputFilePaths(outputBucket);
        inputFilesFilter =
            inputFilePath -> {
              if (outputFilePaths.contains(
                  inputFilePathToOutputFilePath(
                      outputPathProvider,
                      inputFilePath,
                      outputBucket,
                      options.getOutputFileFormat()))) {
                throw new WriteDispositionException(
                    String.format(
                        "The file %s already exists in the output asset bucket: %s",
                        inputFilePath, outputBucket));
              }
              return true;
            };
        break;
      case SKIP:
        outputFilePaths = getAllOutputFilePaths(outputBucket);
        inputFilesFilter =
            inputFilePath ->
                !outputFilePaths.contains(
                    inputFilePathToOutputFilePath(
                        outputPathProvider,
                        inputFilePath,
                        outputBucket,
                        options.getOutputFileFormat()));
        break;
      default:
        throw new UnsupportedOperationException(
            "Unsupported existing file behaviour: " + options.getWriteDisposition());
    }

    ImmutableList<GoogleCloudDataplexV1Entity> entities =
        isInputAsset
            ? dataplex.getCloudStorageEntities(options.getInputAssetOrEntitiesList())
            : dataplex.getEntities(
                Splitter.on(',').trimResults().splitToList(options.getInputAssetOrEntitiesList()));

    List<EntityWithPartitions> processedEntities = new ArrayList<>();
    for (GoogleCloudDataplexV1Entity entity : entities) {
      ImmutableList<GoogleCloudDataplexV1Partition> partitions =
          dataplex.getPartitions(entity.getName());
      if (partitions.isEmpty()) {
        String outputPath = outputPathProvider.outputPathFrom(entity.getDataPath(), outputBucket);
        Iterator<String> inputFilePaths =
            getFilesFromFilePattern(entityToFileSpec(entity)).filter(inputFilesFilter).iterator();
        if (inputFilePaths.hasNext()) {
          processedEntities.add(new EntityWithPartitions(entity));
        }
        inputFilePaths.forEachRemaining(
            inputFilePath ->
                pipeline.apply(
                    "Convert " + shortenDataplexName(entity.getName()),
                    new ConvertFiles(entity, inputFilePath, options, outputPath)));
      } else {
        List<GoogleCloudDataplexV1Partition> processedPartitions = new ArrayList<>();
        for (GoogleCloudDataplexV1Partition partition : partitions) {
          String outputPath =
              outputPathProvider.outputPathFrom(partition.getLocation(), outputBucket);
          Iterator<String> inputFilePaths =
              getFilesFromFilePattern(partitionToFileSpec(partition))
                  .filter(inputFilesFilter)
                  .iterator();
          if (inputFilePaths.hasNext()) {
            processedPartitions.add(partition);
          }
          inputFilePaths.forEachRemaining(
              inputFilePath ->
                  pipeline.apply(
                      "Convert " + shortenDataplexName(partition.getName()),
                      new ConvertFiles(entity, inputFilePath, options, outputPath)));
        }
        if (!processedPartitions.isEmpty()) {
          processedEntities.add(new EntityWithPartitions(entity, processedPartitions));
        }
      }
    }

    if (options.getUpdateDataplexMetadata() && !processedEntities.isEmpty()) {
      updateDataplexMetadata(
          dataplex, options, processedEntities, outputPathProvider, outputBucket);
    }

    if (processedEntities.isEmpty()) {
      pipeline.apply("Nothing to convert", new NoopTransform());
    }

    return pipeline.run();
  }