in src/main/java/com/amazonaws/services/neptune/export/NeptuneExportService.java [107:249]
public S3ObjectInfo execute() throws IOException {
Args args;
try {
args = new Args(cmd);
if (StringUtils.isNotEmpty(configFileS3Path)) {
args.removeOptions("-c", "--config-file");
}
if (StringUtils.isNotEmpty(queriesFileS3Path)) {
args.removeOptions("--queries");
}
if (args.contains("create-pg-config") ||
args.contains("export-pg") ||
args.contains("export-pg-from-config") ||
args.contains("export-pg-from-queries") ||
args.contains("export-rdf")) {
args.removeOptions("-d", "--dir");
args.addOption("-d", new File(localOutputPath, "output").getAbsolutePath());
if (maxConcurrency > 0 && !args.contains("--clone-cluster-max-concurrency")) {
args.addOption("--clone-cluster-max-concurrency", String.valueOf(maxConcurrency));
}
if (!args.contains("--clone-cluster-correlation-id")){
String correlationId = EnvironmentVariableUtils.getOptionalEnv("AWS_BATCH_JOB_ID", null);
if (StringUtils.isNotEmpty(correlationId)){
args.addOption("--clone-cluster-correlation-id", correlationId);
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
try (TransferManagerWrapper transferManager = new TransferManagerWrapper(s3Region, s3CredentialsProvider)) {
if (cleanOutputPath) {
clearTempFiles();
}
if (StringUtils.isNotEmpty(configFileS3Path)) {
updateArgs(args, "--config-file", downloadFile(transferManager.get(), configFileS3Path));
}
if (StringUtils.isNotEmpty(queriesFileS3Path)) {
updateArgs(args, "--queries", downloadFile(transferManager.get(), queriesFileS3Path));
}
}
if (additionalParams.has(NEPTUNE_ML_PROFILE_NAME) && (!args.contains("--profile", NEPTUNE_ML_PROFILE_NAME))) {
args.addOption("--profile", NEPTUNE_ML_PROFILE_NAME);
}
Collection<String> profiles = args.getOptionValues("--profile");
if (!createExportSubdirectory && !overwriteExisting) {
checkS3OutputIsEmpty();
}
EventHandlerCollection eventHandlerCollection = new EventHandlerCollection();
Collection<CompletionFileWriter> completionFileWriters = new ArrayList<>();
ExportToS3NeptuneExportEventHandler.S3UploadParams s3UploadParams =
new ExportToS3NeptuneExportEventHandler.S3UploadParams()
.setCreateExportSubdirectory(createExportSubdirectory)
.setOverwriteExisting(overwriteExisting);
ExportToS3NeptuneExportEventHandler exportToS3EventHandler = new ExportToS3NeptuneExportEventHandler(
localOutputPath,
outputS3Path,
s3Region,
completionFileS3Path,
completionFilePayload,
uploadToS3OnError,
s3UploadParams,
profiles,
completionFileWriters,
sseKmsKeyId,
s3CredentialsProvider);
eventHandlerCollection.addHandler(exportToS3EventHandler);
if (profiles.contains(NEPTUNE_ML_PROFILE_NAME)) {
JsonNode neptuneMlNode = additionalParams.path(NEPTUNE_ML_PROFILE_NAME);
boolean useV2 = args.contains("--feature-toggle", FeatureToggle.NeptuneML_V2.name()) ||
(neptuneMlNode.has("version") && neptuneMlNode.get("version").textValue().startsWith("v2."));
boolean useV1 = (neptuneMlNode.has("version") && neptuneMlNode.get("version").textValue().startsWith("v1."));
if (useV1) {
NeptuneMachineLearningExportEventHandlerV1 neptuneMlEventHandler =
new NeptuneMachineLearningExportEventHandlerV1(
outputS3Path,
s3Region,
createExportSubdirectory,
additionalParams,
args,
profiles,
sseKmsKeyId,
s3CredentialsProvider);
eventHandlerCollection.addHandler(neptuneMlEventHandler);
} else {
NeptuneMachineLearningExportEventHandlerV2 neptuneMlEventHandler =
new NeptuneMachineLearningExportEventHandlerV2(
outputS3Path,
s3Region,
createExportSubdirectory,
additionalParams,
args,
profiles,
sseKmsKeyId,
s3CredentialsProvider);
eventHandlerCollection.addHandler(neptuneMlEventHandler);
}
}
if (profiles.contains(INCREMENTAL_EXPORT_PROFILE_NAME)) {
IncrementalExportEventHandler incrementalExportEventHandler = new IncrementalExportEventHandler(additionalParams);
completionFileWriters.add(incrementalExportEventHandler);
eventHandlerCollection.addHandler(incrementalExportEventHandler);
}
/**
* We are removing a buffer of 1000 for maxFileDescriptorCount used at {@link com.amazonaws.services.neptune.propertygraph.io.LabelWriters#put}
* since the value received from neptune-export service is set as the `nofile` ulimit in the AWS Batch
* container properties and there might be other processes on the container having open files.
* This ensures we close the leastRecentlyAccessed files before exceeding the hard limit for `nofile` ulimit.
*/
final int maxFileDescriptorCountAfterRemovingBuffer = Math.max(maxFileDescriptorCount - 1000, MAX_FILE_DESCRIPTOR_COUNT);
eventHandlerCollection.onBeforeExport(args, s3UploadParams);
logger.info("Args after service init: {}", String.join(" ", args.values()));
new NeptuneExportRunner(args.values(), eventHandlerCollection, false, maxFileDescriptorCountAfterRemovingBuffer).run();
return exportToS3EventHandler.result();
}