public synchronized void init()

in agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java [123:391]


    public synchronized void init(Properties props, String appType) {
        LOG.info("AuditProviderFactory: initializing..");

        if (mInitDone) {
            LOG.warn("AuditProviderFactory.init(): already initialized! Will try to re-initialize");
        }

        mInitDone        = true;
        componentAppType = appType;

        MiscUtil.setApplicationType(appType);

        boolean isEnabled = MiscUtil.getBooleanProperty(props, AUDIT_IS_ENABLED_PROP, true);

        if (!isEnabled) {
            LOG.info("AuditProviderFactory: Audit not enabled..");

            mProvider = getDefaultProvider();

            return;
        }

        boolean isAuditToHdfsEnabled            = MiscUtil.getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false);
        boolean isAuditToLog4jEnabled           = MiscUtil.getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false);
        boolean isAuditToKafkaEnabled           = MiscUtil.getBooleanProperty(props, AUDIT_KAFKA_IS_ENABLED_PROP, false);
        boolean isAuditToSolrEnabled            = MiscUtil.getBooleanProperty(props, AUDIT_SOLR_IS_ENABLED_PROP, false);
        boolean isAuditFileCacheProviderEnabled = MiscUtil.getBooleanProperty(props, AUDIT_IS_FILE_CACHE_PROVIDER_ENABLE_PROP, false);

        List<AuditHandler> providers = new ArrayList<>();

        for (Object propNameObj : props.keySet()) {
            LOG.info("AUDIT PROPERTY: {}={}", propNameObj, props.getProperty(propNameObj.toString()));
        }

        // Process new audit configurations
        List<String> destNameList = new ArrayList<>();

        for (Object propNameObj : props.keySet()) {
            String propName = propNameObj.toString();

            if (!propName.startsWith(AUDIT_DEST_BASE)) {
                continue;
            }

            String       destName = propName.substring(AUDIT_DEST_BASE.length() + 1);
            List<String> splits   = MiscUtil.toArray(destName, ".");

            if (splits.size() > 1) {
                continue;
            }

            String value = props.getProperty(propName);

            if (value.equalsIgnoreCase("enable") || value.equalsIgnoreCase("enabled") || value.equalsIgnoreCase("true")) {
                destNameList.add(destName);

                LOG.info("Audit destination {} is set to {}", propName, value);
            }
        }

        for (String destName : destNameList) {
            String       destPropPrefix = AUDIT_DEST_BASE + "." + destName;
            AuditHandler destProvider   = getProviderFromConfig(props, destPropPrefix, destName, null);

            if (destProvider != null) {
                destProvider.init(props, destPropPrefix);

                String queueName = MiscUtil.getStringProperty(props, destPropPrefix + "." + AuditQueue.PROP_QUEUE);

                if (queueName == null || queueName.isEmpty()) {
                    LOG.info("{}.{} is not set. Setting queue to batch for {}", destPropPrefix, AuditQueue.PROP_QUEUE, destName);

                    queueName = "batch";
                }

                LOG.info("queue for {} is {}", destName, queueName);

                if (queueName != null && !queueName.isEmpty() && !queueName.equalsIgnoreCase("none")) {
                    String       queuePropPrefix = destPropPrefix + "." + queueName;
                    AuditHandler queueProvider   = getProviderFromConfig(props, queuePropPrefix, queueName, destProvider);

                    if (queueProvider != null) {
                        if (queueProvider instanceof AuditQueue) {
                            AuditQueue qProvider = (AuditQueue) queueProvider;

                            qProvider.init(props, queuePropPrefix);

                            providers.add(queueProvider);
                        } else {
                            LOG.error("Provider queue doesn't extend AuditQueue. Destination={} can't be created. queueName={}", destName, queueName);
                        }
                    } else {
                        LOG.error("Queue provider for destination {} can't be created. queueName={}", destName, queueName);
                    }
                } else {
                    LOG.info("Audit destination {} added to provider list", destProvider.getName());

                    providers.add(destProvider);
                }
            }
        }
        if (!providers.isEmpty()) {
            LOG.info("Using v3 audit configuration");

            AuditHandler consumer = providers.get(0);

            // Possible pipeline is:
            // async_queue -> summary_queue -> multidestination -> batch_queue
            // -> hdfs_destination
            // -> batch_queue -> solr_destination
            // -> batch_queue -> kafka_destination
            // Above, up to multidestination, the providers are same, then it
            // branches out in parallel.

            // Set the providers in the reverse order e.g.

            if (providers.size() > 1) {
                // If there are more than one destination, then we need multi destination to process it in parallel
                LOG.info("MultiDestAuditProvider is used. Destination count={}", providers.size());

                MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();

                multiDestProvider.init(props);
                multiDestProvider.addAuditProviders(providers);

                consumer = multiDestProvider;
            }

            // Let's see if Summary is enabled, then summarize before sending it downstream
            String  propPrefix     = BaseAuditHandler.PROP_DEFAULT_PREFIX;
            boolean summaryEnabled = MiscUtil.getBooleanProperty(props, propPrefix + "." + "summary" + "." + "enabled", false);

            if (summaryEnabled) {
                LOG.info("AuditSummaryQueue is enabled");

                AuditSummaryQueue summaryQueue = new AuditSummaryQueue(consumer);

                summaryQueue.init(props, propPrefix);

                consumer = summaryQueue;
            } else {
                LOG.info("AuditSummaryQueue is disabled");
            }

            if (!isAuditFileCacheProviderEnabled) {
                // Create the AsysnQueue
                AuditAsyncQueue asyncQueue = new AuditAsyncQueue(consumer);

                propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + "async";

                asyncQueue.init(props, propPrefix);
                asyncQueue.setParentPath(componentAppType);

                mProvider = asyncQueue;

                LOG.info("Starting audit queue {}", mProvider.getName());

                mProvider.start();
            } else {
                // Assign AsyncQueue to AuditFileCacheProvider
                AuditFileCacheProvider auditFileCacheProvider = new AuditFileCacheProvider(consumer);

                propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + "filecache";

                auditFileCacheProvider.init(props, propPrefix);
                auditFileCacheProvider.setParentPath(componentAppType);

                mProvider = auditFileCacheProvider;

                LOG.info("Starting Audit File Cache Provider {}", mProvider.getName());

                mProvider.start();
            }
        } else {
            LOG.info("No v3 audit configuration found. Trying v2 audit configurations");

            if (!isEnabled || !(isAuditToHdfsEnabled || isAuditToKafkaEnabled || isAuditToLog4jEnabled || isAuditToSolrEnabled || providers.isEmpty())) {
                LOG.info("AuditProviderFactory: Audit not enabled..");

                mProvider = getDefaultProvider();

                return;
            }

            if (isAuditToHdfsEnabled) {
                LOG.info("HdfsAuditProvider is enabled");

                HdfsAuditProvider hdfsProvider = new HdfsAuditProvider();

                boolean isAuditToHdfsAsync = MiscUtil.getBooleanProperty(props, HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false);

                if (isAuditToHdfsAsync) {
                    int maxQueueSize     = MiscUtil.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
                    int maxFlushInterval = MiscUtil.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);

                    AsyncAuditProvider asyncProvider = new AsyncAuditProvider("HdfsAuditProvider", maxQueueSize, maxFlushInterval, hdfsProvider);

                    providers.add(asyncProvider);
                } else {
                    providers.add(hdfsProvider);
                }
            }

            if (isAuditToKafkaEnabled) {
                LOG.info("KafkaAuditProvider is enabled");

                KafkaAuditProvider kafkaProvider = new KafkaAuditProvider();

                kafkaProvider.init(props);

                if (kafkaProvider.isAsync()) {
                    AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MyKafkaAuditProvider", 1000, 1000, kafkaProvider);

                    providers.add(asyncProvider);
                } else {
                    providers.add(kafkaProvider);
                }
            }

            if (isAuditToSolrEnabled) {
                LOG.info("SolrAuditProvider is enabled");

                SolrAuditProvider solrProvider = new SolrAuditProvider();

                solrProvider.init(props);

                if (solrProvider.isAsync()) {
                    AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MySolrAuditProvider", 1000, 1000, solrProvider);

                    providers.add(asyncProvider);
                } else {
                    providers.add(solrProvider);
                }
            }

            if (isAuditToLog4jEnabled) {
                Log4jAuditProvider log4jProvider = new Log4jAuditProvider();

                boolean isAuditToLog4jAsync = MiscUtil.getBooleanProperty(props, Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP, false);

                if (isAuditToLog4jAsync) {
                    int maxQueueSize     = MiscUtil.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
                    int maxFlushInterval = MiscUtil.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);

                    AsyncAuditProvider asyncProvider = new AsyncAuditProvider("Log4jAuditProvider", maxQueueSize, maxFlushInterval, log4jProvider);

                    providers.add(asyncProvider);
                } else {
                    providers.add(log4jProvider);
                }
            }
            if (providers.isEmpty()) {
                mProvider = getDefaultProvider();
            } else if (providers.size() == 1) {
                mProvider = providers.get(0);
            } else {
                MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();

                multiDestProvider.addAuditProviders(providers);

                mProvider = multiDestProvider;
            }

            mProvider.init(props);
            mProvider.start();
        }

        installJvmShutdownHook(props);
    }