private void registerProcess()

in addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java [631:703]


    private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws AtlasHookException {
        try {
            Set<ReadEntity> inputs = event.getInputs();
            Set<WriteEntity> outputs = event.getOutputs();

            //Even explain CTAS has operation name as CREATETABLE_AS_SELECT
            if (inputs.isEmpty() && outputs.isEmpty()) {
                LOG.info("Explain statement. Skipping...");
                return;
            }

            if (event.getQueryId() == null) {
                LOG.info("Query id/plan is missing for {}", event.getQueryStr());
            }

            final SortedMap<ReadEntity, Referenceable> source = new TreeMap<>(entityComparator);
            final SortedMap<WriteEntity, Referenceable> target = new TreeMap<>(entityComparator);

            final Set<String> dataSets = new HashSet<>();
            final Set<Referenceable> entities = new LinkedHashSet<>();

            boolean isSelectQuery = isSelectQuery(event);

            // filter out select queries which do not modify data
            if (!isSelectQuery) {

                SortedSet<ReadEntity> sortedHiveInputs = new TreeSet<>(entityComparator);
                if (event.getInputs() != null) {
                    sortedHiveInputs.addAll(event.getInputs());
                }

                SortedSet<WriteEntity> sortedHiveOutputs = new TreeSet<>(entityComparator);
                if (event.getOutputs() != null) {
                    sortedHiveOutputs.addAll(event.getOutputs());
                }

                for (ReadEntity readEntity : sortedHiveInputs) {
                    processHiveEntity(dgiBridge, event, readEntity, dataSets, source, entities);
                }

                for (WriteEntity writeEntity : sortedHiveOutputs) {
                    processHiveEntity(dgiBridge, event, writeEntity, dataSets, target, entities);
                }

                if (source.size() > 0 || target.size() > 0) {
                    Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, source, target);
                    // setup Column Lineage
                    List<Referenceable> sourceList = new ArrayList<>(source.values());
                    List<Referenceable> targetList = new ArrayList<>(target.values());
                    List<Referenceable> colLineageProcessInstances = new ArrayList<>();
                    try {
                        Map<String, Referenceable> columnQNameToRef =
                                ColumnLineageUtils.buildColumnReferenceableMap(sourceList, targetList);
                        colLineageProcessInstances = createColumnLineageProcessInstances(processReferenceable,
                                event.lineageInfo,
                                columnQNameToRef);
                    } catch (Exception e) {
                        LOG.warn("Column lineage process setup failed with exception {}", e);
                    }
                    colLineageProcessInstances.add(0, processReferenceable);
                    entities.addAll(colLineageProcessInstances);
                    event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
                } else {
                    LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
                }
            } else {
                LOG.info("Skipped query {} for processing since it is a select query ", event.getQueryStr());
            }
        }
        catch(Exception e) {
            throw new AtlasHookException("HiveHook.registerProcess() failed.", e);
        }
    }