in nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java [320:556]
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final FileSystem hdfs = getFileSystem();
final Configuration configuration = getConfiguration();
final UserGroupInformation ugi = getUserGroupInformation();
if (configuration == null || hdfs == null || ugi == null) {
getLogger().error("HDFS not configured properly");
session.transfer(flowFile, getFailureRelationship());
context.yield();
return;
}
ugi.doAs(new PrivilegedAction<>() {
@Override
public Object run() {
Path tempDotCopyFile = null;
FlowFile putFlowFile = flowFile;
try {
final String writingStrategy = context.getProperty(WRITING_STRATEGY).getValue();
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
final int bufferSize = getBufferSize(context, session, putFlowFile);
final short replication = getReplication(context, session, putFlowFile, dirPath);
final CompressionCodec codec = getCompressionCodec(context, configuration);
final String filename = codec != null
? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
final Path tempCopyFile = new Path(dirPath, "." + filename);
final Path copyFile = new Path(dirPath, filename);
// Depending on the writing strategy, we might need a temporary file
final Path actualCopyFile = (writingStrategy.equals(WRITE_AND_RENAME))
? tempCopyFile
: copyFile;
// Create destination directory if it does not exist
boolean targetDirCreated = false;
try {
final FileStatus fileStatus = hdfs.getFileStatus(dirPath);
if (!fileStatus.isDirectory()) {
throw new IOException(dirPath.toString() + " already exists and is not a directory");
}
if (fileStatus.hasAcl()) {
checkAclStatus(getAclStatus(dirPath));
}
} catch (FileNotFoundException fe) {
targetDirCreated = hdfs.mkdirs(dirPath);
if (!targetDirCreated) {
throw new IOException(dirPath.toString() + " could not be created");
}
final FileStatus fileStatus = hdfs.getFileStatus(dirPath);
if (fileStatus.hasAcl()) {
checkAclStatus(getAclStatus(dirPath));
}
changeOwner(context, hdfs, dirPath, flowFile);
}
final boolean destinationExists = hdfs.exists(copyFile);
// If destination file already exists, resolve that based on processor configuration
if (destinationExists) {
switch (conflictResponse) {
case REPLACE_RESOLUTION:
if (hdfs.delete(copyFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
copyFile, putFlowFile);
}
break;
case IGNORE_RESOLUTION:
session.transfer(putFlowFile, getSuccessRelationship());
getLogger().info("transferring {} to success because file with same name already exists",
putFlowFile);
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
putFlowFile);
return null;
default:
break;
}
}
// Write FlowFile to temp file on HDFS
final StopWatch stopWatch = new StopWatch(true);
final ResourceTransferSource resourceTransferSource = context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
try (final InputStream in = getFileResource(resourceTransferSource, context, flowFile.getAttributes())
.map(FileResource::getInputStream).orElseGet(() -> session.read(flowFile))) {
OutputStream fos = null;
Path createdFile = null;
try {
if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
fos = hdfs.append(copyFile, bufferSize);
} else {
final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
if (shouldIgnoreLocality(context, session)) {
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
}
fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
null, null);
}
if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = actualCopyFile;
if (APPEND_RESOLUTION.equals(conflictResponse)
&& context.getProperty(APPEND_MODE).getValue().equals(AVRO_APPEND_MODE)
&& destinationExists) {
getLogger().info("Appending avro record to existing avro file");
try (final DataFileStream<Object> reader = new DataFileStream<>(in, new GenericDatumReader<>());
final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.appendTo(new FsInput(copyFile, configuration), fos); // open writer to existing file
writer.appendAllFrom(reader, false); // append flowfile content
writer.flush();
getLogger().info("Successfully appended avro record");
} catch (Exception e) {
getLogger().error("Error occurred during appending to existing avro file", e);
throw new ProcessException(e);
}
} else {
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
}
} finally {
try {
if (fos != null) {
fos.close();
}
} catch (Throwable t) {
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
if (createdFile != null) {
try {
hdfs.delete(createdFile, false);
} catch (Throwable ignored) {
}
}
throw t;
}
fos = null;
}
}
stopWatch.stop();
final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile;
if (
writingStrategy.equals(WRITE_AND_RENAME)
&& (!conflictResponse.equals(APPEND_RESOLUTION) || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
) {
boolean renamed = false;
for (int i = 0; i < 10; i++) { // try to rename multiple times.
if (hdfs.rename(tempCopyFile, copyFile)) {
renamed = true;
break; // rename was successful
}
Thread.sleep(200L); // try waiting to let whatever might cause rename failure to resolve
}
if (!renamed) {
hdfs.delete(tempCopyFile, false);
throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+ " to its final filename");
}
changeOwner(context, hdfs, copyFile, flowFile);
}
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
putFlowFile, copyFile, millis, dataRate);
final String newFilename = copyFile.getName();
final String hdfsPath = copyFile.getParent().toString();
putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
putFlowFile = session.putAttribute(putFlowFile, TARGET_HDFS_DIR_CREATED_ATTRIBUTE, String.valueOf(targetDirCreated));
final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
putFlowFile = session.putAttribute(putFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
session.transfer(putFlowFile, getSuccessRelationship());
} catch (final Throwable t) {
if (handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
return null;
}
if (tempDotCopyFile != null) {
try {
hdfs.delete(tempDotCopyFile, false);
} catch (Exception e) {
getLogger().error("Unable to remove temporary file {}", tempDotCopyFile, e);
}
}
getLogger().error("Failed to write to HDFS", t);
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
context.yield();
}
return null;
}
private void checkAclStatus(final AclStatus aclStatus) throws IOException {
final boolean isDefaultACL = aclStatus.getEntries().stream().anyMatch(
aclEntry -> AclEntryScope.DEFAULT.equals(aclEntry.getScope()));
final boolean isSetUmask = context.getProperty(UMASK).isSet();
if (isDefaultACL && isSetUmask) {
throw new IOException("PutHDFS umask setting is ignored by HDFS when HDFS default ACL is set.");
}
}
private AclStatus getAclStatus(final Path dirPath) {
return aclCache.get(dirPath, fn -> {
try {
return hdfs.getAclStatus(dirPath);
} catch (final IOException e) {
throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e);
}
});
}
});
}