public void onTrigger()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java [199:391]


    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final StopWatch stopWatch = new StopWatch(true);
        final Path configuredRootDirPath = Paths.get(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
        final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
        final Integer maxDestinationFiles = context.getProperty(MAX_DESTINATION_FILES).asInteger();
        final ComponentLog logger = getLogger();

        Path tempDotCopyFile = null;
        try {
            final Path rootDirPath = configuredRootDirPath.toAbsolutePath();
            String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
            final Path tempCopyFile = rootDirPath.resolve("." + filename);
            final Path copyFile = rootDirPath.resolve(filename);

            final String permissions = context.getProperty(CHANGE_PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
            final String owner = context.getProperty(CHANGE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
            final String group = context.getProperty(CHANGE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
            if (!Files.exists(rootDirPath)) {
                if (context.getProperty(CREATE_DIRS).asBoolean()) {
                    Path existing = rootDirPath;
                    while (!Files.exists(existing)) {
                        existing = existing.getParent();
                    }
                    if (permissions != null && !permissions.trim().isEmpty()) {
                        try {
                            String perms = stringPermissions(permissions, true);
                            if (!perms.isEmpty()) {
                                Files.createDirectories(rootDirPath, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString(perms)));
                            } else {
                                Files.createDirectories(rootDirPath);
                            }
                        } catch (Exception e) {
                            flowFile = session.penalize(flowFile);
                            session.transfer(flowFile, REL_FAILURE);
                            logger.error("Could not set create directory with permissions {}", permissions, e);
                            return;
                        }
                    } else {
                        Files.createDirectories(rootDirPath);
                    }

                    boolean chOwner = owner != null && !owner.trim().isEmpty();
                    boolean chGroup = group != null && !group.trim().isEmpty();
                    if (chOwner || chGroup) {
                        Path currentPath = rootDirPath;
                        while (!currentPath.equals(existing)) {
                            if (chOwner) {
                                try {
                                    UserPrincipalLookupService lookupService = currentPath.getFileSystem().getUserPrincipalLookupService();
                                    Files.setOwner(currentPath, lookupService.lookupPrincipalByName(owner));
                                } catch (Exception e) {
                                    logger.warn("Could not set directory owner to {}", owner, e);
                                }
                            }
                            if (chGroup) {
                                try {
                                    UserPrincipalLookupService lookupService = currentPath.getFileSystem().getUserPrincipalLookupService();
                                    PosixFileAttributeView view = Files.getFileAttributeView(currentPath, PosixFileAttributeView.class);
                                    view.setGroup(lookupService.lookupPrincipalByGroupName(group));
                                } catch (Exception e) {
                                    logger.warn("Could not set file group to {}", group, e);
                                }
                            }
                            currentPath = currentPath.getParent();
                        }
                    }
                } else {
                    flowFile = session.penalize(flowFile);
                    session.transfer(flowFile, REL_FAILURE);
                    logger.error("Penalizing {} and routing to 'failure' because the output directory {} does not exist and Processor is configured not to create missing directories",
                            flowFile, rootDirPath);
                    return;
                }
            }

            final Path dotCopyFile = tempCopyFile;
            tempDotCopyFile = dotCopyFile;
            Path finalCopyFile = copyFile;

            final Path finalCopyFileDir = finalCopyFile.getParent();
            if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already
                final long numFiles = getFilesNumberInFolder(finalCopyFileDir, filename);

                if (numFiles >= maxDestinationFiles) {
                    flowFile = session.penalize(flowFile);
                    logger.warn("Penalizing {} and routing to 'failure' because the output directory {} has {} files, which exceeds the configured maximum number of files",
                            flowFile, finalCopyFileDir, numFiles);
                    session.transfer(flowFile, REL_FAILURE);
                    return;
                }
            }

            if (Files.exists(finalCopyFile)) {
                switch (conflictResponse) {
                    case REPLACE_RESOLUTION:
                        Files.delete(finalCopyFile);
                        logger.info("Deleted {} as configured in order to replace with the contents of {}", finalCopyFile, flowFile);
                        break;
                    case IGNORE_RESOLUTION:
                        session.transfer(flowFile, REL_SUCCESS);
                        logger.info("Transferring {} to success because file with same name already exists", flowFile);
                        return;
                    case FAIL_RESOLUTION:
                        flowFile = session.penalize(flowFile);
                        logger.warn("Penalizing {} and routing to failure as configured because file with the same name already exists", flowFile);
                        session.transfer(flowFile, REL_FAILURE);
                        return;
                    default:
                        break;
                }
            }

            session.exportTo(flowFile, dotCopyFile, false);

            final String lastModifiedTime = context.getProperty(CHANGE_LAST_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue();
            if (lastModifiedTime != null && !lastModifiedTime.trim().isEmpty()) {
                try {
                    final OffsetDateTime fileModifyTime = OffsetDateTime.parse(lastModifiedTime, DATE_TIME_FORMATTER);
                    dotCopyFile.toFile().setLastModified(fileModifyTime.toInstant().toEpochMilli());
                } catch (Exception e) {
                    logger.warn("Could not set file lastModifiedTime to {}", lastModifiedTime, e);
                }
            }

            if (permissions != null && !permissions.trim().isEmpty()) {
                try {
                    String perms = stringPermissions(permissions, false);
                    if (!perms.isEmpty()) {
                        Files.setPosixFilePermissions(dotCopyFile, PosixFilePermissions.fromString(perms));
                    }
                } catch (Exception e) {
                    logger.warn("Could not set file permissions to {}", permissions, e);
                }
            }

            if (owner != null && !owner.trim().isEmpty()) {
                try {
                    UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService();
                    Files.setOwner(dotCopyFile, lookupService.lookupPrincipalByName(owner));
                } catch (Exception e) {
                    logger.warn("Could not set file owner to {}", owner, e);
                }
            }

            if (group != null && !group.trim().isEmpty()) {
                try {
                    UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService();
                    PosixFileAttributeView view = Files.getFileAttributeView(dotCopyFile, PosixFileAttributeView.class);
                    view.setGroup(lookupService.lookupPrincipalByGroupName(group));
                } catch (Exception e) {
                    logger.warn("Could not set file group to {}", group, e);
                }
            }

            boolean renamed = false;
            for (int i = 0; i < 10; i++) { // try rename up to 10 times.
                if (dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) {
                    renamed = true;
                    break; // rename was successful
                }
                Thread.sleep(100L); // try waiting a few ms to let whatever might cause rename failure to resolve
            }

            if (!renamed) {
                if (Files.exists(dotCopyFile) && dotCopyFile.toFile().delete()) {
                    logger.debug("Deleted dot copy file {}", dotCopyFile);
                }
                throw new ProcessException("Could not rename: " + dotCopyFile);
            } else {
                logger.info("Produced copy of {} at location {}", flowFile, finalCopyFile);
            }

            session.getProvenanceReporter().send(flowFile, finalCopyFile.toFile().toURI().toString(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(flowFile, REL_SUCCESS);
        } catch (final Throwable t) {
            if (tempDotCopyFile != null) {
                try {
                    Files.deleteIfExists(tempDotCopyFile);
                } catch (final Exception e) {
                    logger.error("Unable to remove temporary file {}", tempDotCopyFile, e);
                }
            }

            flowFile = session.penalize(flowFile);
            logger.error("Penalizing {} and transferring to failure", flowFile, t);
            session.transfer(flowFile, REL_FAILURE);
        }
    }