public static List createProcessEntity()

in addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java [210:288]


    public static List<Referenceable> createProcessEntity(org.apache.falcon.entity.v0.process.Process process,
                                                          ConfigurationStore falconStore) throws Exception {
        LOG.info("Creating process Entity : {}", process.getName());

        // The requirement is for each cluster, create a process entity with name
        // clustername.processname
        List<Referenceable> entities = new ArrayList<>();

        if (process.getClusters() != null) {

            for (Cluster processCluster : process.getClusters().getClusters()) {
                org.apache.falcon.entity.v0.cluster.Cluster cluster =
                        falconStore.get(EntityType.CLUSTER, processCluster.getName());
                Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo());
                entities.add(clusterReferenceable);

                List<Referenceable> inputs = new ArrayList<>();
                if (process.getInputs() != null) {
                    for (Input input : process.getInputs().getInputs()) {
                        Feed feed = falconStore.get(EntityType.FEED, input.getFeed());
                        Referenceable inputReferenceable = getFeedDataSetReference(feed, clusterReferenceable);
                        entities.add(inputReferenceable);
                        inputs.add(inputReferenceable);
                    }
                }

                List<Referenceable> outputs = new ArrayList<>();
                if (process.getOutputs() != null) {
                    for (Output output : process.getOutputs().getOutputs()) {
                        Feed feed = falconStore.get(EntityType.FEED, output.getFeed());
                        Referenceable outputReferenceable = getFeedDataSetReference(feed, clusterReferenceable);
                        entities.add(outputReferenceable);
                        outputs.add(outputReferenceable);
                    }
                }

                if (!inputs.isEmpty() || !outputs.isEmpty()) {

                    Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS.getName());
                    processEntity.set(AtlasClient.NAME, process.getName());
                    processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
                            getProcessQualifiedName(process.getName(), cluster.getName()));
                    processEntity.set(FalconBridge.FREQUENCY, process.getFrequency().toString());

                    if (!inputs.isEmpty()) {
                        processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs);
                    }
                    if (!outputs.isEmpty()) {
                        processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs);
                    }

                    // set cluster
                    processEntity.set(FalconBridge.RUNSON, clusterReferenceable);

                    // Set user
                    if (process.getACL() != null) {
                        processEntity.set(AtlasClient.OWNER, process.getACL().getOwner());
                    }

                    if (StringUtils.isNotEmpty(process.getTags())) {
                        processEntity.set(FalconBridge.TAGS,
                                EventUtil.convertKeyValueStringToMap(process.getTags()));
                    }

                    if (process.getPipelines() != null) {
                        processEntity.set(FalconBridge.PIPELINES, process.getPipelines());
                    }

                    processEntity.set(FalconBridge.WFPROPERTIES,
                            getProcessEntityWFProperties(process.getWorkflow(),
                                    process.getName()));

                    entities.add(processEntity);
                }

            }
        }
        return entities;
    }