public void onTrigger()

in nifi-extension-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java [320:556]


    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        final FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final FileSystem hdfs = getFileSystem();
        final Configuration configuration = getConfiguration();
        final UserGroupInformation ugi = getUserGroupInformation();

        if (configuration == null || hdfs == null || ugi == null) {
            getLogger().error("HDFS not configured properly");
            session.transfer(flowFile, getFailureRelationship());
            context.yield();
            return;
        }

        ugi.doAs(new PrivilegedAction<>() {
            @Override
            public Object run() {
                Path tempDotCopyFile = null;
                FlowFile putFlowFile = flowFile;
                try {
                    final String writingStrategy = context.getProperty(WRITING_STRATEGY).getValue();
                    final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
                    final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
                    final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
                    final int bufferSize = getBufferSize(context, session, putFlowFile);
                    final short replication = getReplication(context, session, putFlowFile, dirPath);

                    final CompressionCodec codec = getCompressionCodec(context, configuration);

                    final String filename = codec != null
                            ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
                            : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());

                    final Path tempCopyFile = new Path(dirPath, "." + filename);
                    final Path copyFile = new Path(dirPath, filename);

                    // Depending on the writing strategy, we might need a temporary file
                    final Path actualCopyFile = (writingStrategy.equals(WRITE_AND_RENAME))
                            ? tempCopyFile
                            : copyFile;

                    // Create destination directory if it does not exist
                    boolean targetDirCreated = false;
                    try {
                        final FileStatus fileStatus = hdfs.getFileStatus(dirPath);
                        if (!fileStatus.isDirectory()) {
                            throw new IOException(dirPath.toString() + " already exists and is not a directory");
                        }
                        if (fileStatus.hasAcl()) {
                            checkAclStatus(getAclStatus(dirPath));
                        }
                    } catch (FileNotFoundException fe) {
                        targetDirCreated = hdfs.mkdirs(dirPath);
                        if (!targetDirCreated) {
                            throw new IOException(dirPath.toString() + " could not be created");
                        }
                        final FileStatus fileStatus = hdfs.getFileStatus(dirPath);
                        if (fileStatus.hasAcl()) {
                            checkAclStatus(getAclStatus(dirPath));
                        }
                        changeOwner(context, hdfs, dirPath, flowFile);
                    }

                    final boolean destinationExists = hdfs.exists(copyFile);

                    // If destination file already exists, resolve that based on processor configuration
                    if (destinationExists) {
                        switch (conflictResponse) {
                            case REPLACE_RESOLUTION:
                                if (hdfs.delete(copyFile, false)) {
                                    getLogger().info("deleted {} in order to replace with the contents of {}",
                                            copyFile, putFlowFile);
                                }
                                break;
                            case IGNORE_RESOLUTION:
                                session.transfer(putFlowFile, getSuccessRelationship());
                                getLogger().info("transferring {} to success because file with same name already exists",
                                        putFlowFile);
                                return null;
                            case FAIL_RESOLUTION:
                                session.transfer(session.penalize(putFlowFile), getFailureRelationship());
                                getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
                                        putFlowFile);
                                return null;
                            default:
                                break;
                        }
                    }

                    // Write FlowFile to temp file on HDFS
                    final StopWatch stopWatch = new StopWatch(true);
                    final ResourceTransferSource resourceTransferSource = context.getProperty(RESOURCE_TRANSFER_SOURCE).asAllowableValue(ResourceTransferSource.class);
                    try (final InputStream in = getFileResource(resourceTransferSource, context, flowFile.getAttributes())
                            .map(FileResource::getInputStream).orElseGet(() -> session.read(flowFile))) {
                        OutputStream fos = null;
                        Path createdFile = null;
                        try {
                            if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
                                fos = hdfs.append(copyFile, bufferSize);
                            } else {
                                final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);

                                if (shouldIgnoreLocality(context, session)) {
                                    cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
                                }

                                fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
                                                FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
                                        null, null);
                            }

                                if (codec != null) {
                                    fos = codec.createOutputStream(fos);
                                }
                                createdFile = actualCopyFile;

                                if (APPEND_RESOLUTION.equals(conflictResponse)
                                        && context.getProperty(APPEND_MODE).getValue().equals(AVRO_APPEND_MODE)
                                        && destinationExists) {
                                    getLogger().info("Appending avro record to existing avro file");
                                    try (final DataFileStream<Object> reader = new DataFileStream<>(in, new GenericDatumReader<>());
                                         final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
                                        writer.appendTo(new FsInput(copyFile, configuration), fos); // open writer to existing file
                                        writer.appendAllFrom(reader, false); // append flowfile content
                                        writer.flush();
                                        getLogger().info("Successfully appended avro record");
                                    } catch (Exception e) {
                                        getLogger().error("Error occurred during appending to existing avro file", e);
                                        throw new ProcessException(e);
                                    }
                                } else {
                                    BufferedInputStream bis = new BufferedInputStream(in);
                                    StreamUtils.copy(bis, fos);
                                    bis = null;
                                    fos.flush();
                                }
                        } finally {
                            try {
                                if (fos != null) {
                                    fos.close();
                                }
                            } catch (Throwable t) {
                                // when talking to remote HDFS clusters, we don't notice problems until fos.close()
                                if (createdFile != null) {
                                    try {
                                        hdfs.delete(createdFile, false);
                                    } catch (Throwable ignored) {
                                    }
                                }
                                throw t;
                            }
                            fos = null;
                        }
                    }
                    stopWatch.stop();
                    final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
                    final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                    tempDotCopyFile = tempCopyFile;

                    if (
                            writingStrategy.equals(WRITE_AND_RENAME)
                                    && (!conflictResponse.equals(APPEND_RESOLUTION) || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
                    ) {
                        boolean renamed = false;

                        for (int i = 0; i < 10; i++) { // try to rename multiple times.
                            if (hdfs.rename(tempCopyFile, copyFile)) {
                                renamed = true;
                                break; // rename was successful
                            }
                            Thread.sleep(200L); // try waiting to let whatever might cause rename failure to resolve
                        }
                        if (!renamed) {
                            hdfs.delete(tempCopyFile, false);
                            throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
                                    + " to its final filename");
                        }

                        changeOwner(context, hdfs, copyFile, flowFile);
                    }

                    getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
                            putFlowFile, copyFile, millis, dataRate);

                    final String newFilename = copyFile.getName();
                    final String hdfsPath = copyFile.getParent().toString();
                    putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
                    putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
                    putFlowFile = session.putAttribute(putFlowFile, TARGET_HDFS_DIR_CREATED_ATTRIBUTE, String.valueOf(targetDirCreated));
                    final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
                    putFlowFile = session.putAttribute(putFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                    session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());

                    session.transfer(putFlowFile, getSuccessRelationship());

                } catch (final Throwable t) {
                    if (handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
                        return null;
                    }
                    if (tempDotCopyFile != null) {
                        try {
                            hdfs.delete(tempDotCopyFile, false);
                        } catch (Exception e) {
                            getLogger().error("Unable to remove temporary file {}", tempDotCopyFile, e);
                        }
                    }
                    getLogger().error("Failed to write to HDFS", t);
                    session.transfer(session.penalize(putFlowFile), getFailureRelationship());
                    context.yield();
                }

                return null;
            }

            private void checkAclStatus(final AclStatus aclStatus) throws IOException {
                final boolean isDefaultACL = aclStatus.getEntries().stream().anyMatch(
                        aclEntry -> AclEntryScope.DEFAULT.equals(aclEntry.getScope()));
                final boolean isSetUmask = context.getProperty(UMASK).isSet();
                if (isDefaultACL && isSetUmask) {
                    throw new IOException("PutHDFS umask setting is ignored by HDFS when HDFS default ACL is set.");
                }
            }

            private AclStatus getAclStatus(final Path dirPath) {
                return aclCache.get(dirPath, fn -> {
                    try {
                        return hdfs.getAclStatus(dirPath);
                    } catch (final IOException e) {
                        throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e);
                    }
                });
            }
        });
    }