in v2/dataplex/src/main/java/com/google/cloud/teleport/v2/templates/DataplexFileFormatConversion.java [263:385]
public static PipelineResult run(
Pipeline pipeline,
FileFormatConversionOptions options,
DataplexClient dataplex,
OutputPathProvider outputPathProvider)
throws IOException {
boolean isInputAsset = ASSET_PATTERN.matcher(options.getInputAssetOrEntitiesList()).matches();
if (!isInputAsset
&& !ENTITIES_PATTERN.matcher(options.getInputAssetOrEntitiesList()).matches()) {
throw new IllegalArgumentException(
"Either input asset or input entities list must be provided");
}
GoogleCloudDataplexV1Asset outputAsset = dataplex.getAsset(options.getOutputAsset());
if (outputAsset == null
|| outputAsset.getResourceSpec() == null
|| !DataplexAssetResourceSpec.STORAGE_BUCKET
.name()
.equals(outputAsset.getResourceSpec().getType())
|| outputAsset.getResourceSpec().getName() == null) {
throw new IllegalArgumentException(
"Output asset must be an existing asset with resource spec name being a GCS bucket and"
+ " resource spec type of "
+ DataplexAssetResourceSpec.STORAGE_BUCKET.name());
}
String outputBucket = outputAsset.getResourceSpec().getName();
Predicate<String> inputFilesFilter;
switch (options.getWriteDisposition()) {
case OVERWRITE:
inputFilesFilter = inputFilePath -> true;
break;
case FAIL:
Set<String> outputFilePaths = getAllOutputFilePaths(outputBucket);
inputFilesFilter =
inputFilePath -> {
if (outputFilePaths.contains(
inputFilePathToOutputFilePath(
outputPathProvider,
inputFilePath,
outputBucket,
options.getOutputFileFormat()))) {
throw new WriteDispositionException(
String.format(
"The file %s already exists in the output asset bucket: %s",
inputFilePath, outputBucket));
}
return true;
};
break;
case SKIP:
outputFilePaths = getAllOutputFilePaths(outputBucket);
inputFilesFilter =
inputFilePath ->
!outputFilePaths.contains(
inputFilePathToOutputFilePath(
outputPathProvider,
inputFilePath,
outputBucket,
options.getOutputFileFormat()));
break;
default:
throw new UnsupportedOperationException(
"Unsupported existing file behaviour: " + options.getWriteDisposition());
}
ImmutableList<GoogleCloudDataplexV1Entity> entities =
isInputAsset
? dataplex.getCloudStorageEntities(options.getInputAssetOrEntitiesList())
: dataplex.getEntities(
Splitter.on(',').trimResults().splitToList(options.getInputAssetOrEntitiesList()));
List<EntityWithPartitions> processedEntities = new ArrayList<>();
for (GoogleCloudDataplexV1Entity entity : entities) {
ImmutableList<GoogleCloudDataplexV1Partition> partitions =
dataplex.getPartitions(entity.getName());
if (partitions.isEmpty()) {
String outputPath = outputPathProvider.outputPathFrom(entity.getDataPath(), outputBucket);
Iterator<String> inputFilePaths =
getFilesFromFilePattern(entityToFileSpec(entity)).filter(inputFilesFilter).iterator();
if (inputFilePaths.hasNext()) {
processedEntities.add(new EntityWithPartitions(entity));
}
inputFilePaths.forEachRemaining(
inputFilePath ->
pipeline.apply(
"Convert " + shortenDataplexName(entity.getName()),
new ConvertFiles(entity, inputFilePath, options, outputPath)));
} else {
List<GoogleCloudDataplexV1Partition> processedPartitions = new ArrayList<>();
for (GoogleCloudDataplexV1Partition partition : partitions) {
String outputPath =
outputPathProvider.outputPathFrom(partition.getLocation(), outputBucket);
Iterator<String> inputFilePaths =
getFilesFromFilePattern(partitionToFileSpec(partition))
.filter(inputFilesFilter)
.iterator();
if (inputFilePaths.hasNext()) {
processedPartitions.add(partition);
}
inputFilePaths.forEachRemaining(
inputFilePath ->
pipeline.apply(
"Convert " + shortenDataplexName(partition.getName()),
new ConvertFiles(entity, inputFilePath, options, outputPath)));
}
if (!processedPartitions.isEmpty()) {
processedEntities.add(new EntityWithPartitions(entity, processedPartitions));
}
}
}
if (options.getUpdateDataplexMetadata() && !processedEntities.isEmpty()) {
updateDataplexMetadata(
dataplex, options, processedEntities, outputPathProvider, outputBucket);
}
if (processedEntities.isEmpty()) {
pipeline.apply("Nothing to convert", new NoopTransform());
}
return pipeline.run();
}