private Referenceable createDataSet()

in addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java [178:255]


    private Referenceable createDataSet(String name, String topologyOwner,
                                              Serializable instance,
                                              Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException {
        Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null);

        String clusterName = null;
        Referenceable dataSetReferenceable;
        // todo: need to redo this with a config driven approach
        switch (name) {
            case "KafkaSpout":
                dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName());
                final String topicName = config.get("KafkaSpout._spoutConfig.topic");
                dataSetReferenceable.set("topic", topicName);
                dataSetReferenceable.set("uri",
                        config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"));
                if (StringUtils.isEmpty(topologyOwner)) {
                    topologyOwner = ANONYMOUS_OWNER;
                }
                dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner);
                dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(getClusterName(stormConf), topicName));
                dataSetReferenceable.set(AtlasClient.NAME, topicName);
                break;

            case "HBaseBolt":
                dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName());
                final String hbaseTableName = config.get("HBaseBolt.tableName");
                dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir"));
                dataSetReferenceable.set(AtlasClient.NAME, hbaseTableName);
                dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
                clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
                //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
                dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
                        hbaseTableName));
                break;

            case "HdfsBolt":
                dataSetReferenceable = new Referenceable(HiveMetaStoreBridge.HDFS_PATH);
                String hdfsUri = config.get("HdfsBolt.rotationActions") == null
                        ? config.get("HdfsBolt.fileNameFormat.path")
                        : config.get("HdfsBolt.rotationActions");
                final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
                dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
                dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr);
                dataSetReferenceable.set("path", hdfsPathStr);
                dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal"));
                final Path hdfsPath = new Path(hdfsPathStr);
                dataSetReferenceable.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
                break;

            case "HiveBolt":
                // todo: verify if hive table has everything needed to retrieve existing table
                Referenceable dbReferenceable = new Referenceable("hive_db");
                String databaseName = config.get("HiveBolt.options.databaseName");
                dbReferenceable.set(AtlasClient.NAME, databaseName);
                dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
                        HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName));
                dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
                dependentEntities.add(dbReferenceable);
                clusterName = extractComponentClusterName(new HiveConf(), stormConf);
                final String hiveTableName = config.get("HiveBolt.options.tableName");
                dataSetReferenceable = new Referenceable("hive_table");
                final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName,
                        databaseName, hiveTableName);
                dataSetReferenceable.set(AtlasClient.NAME, hiveTableName);
                dataSetReferenceable.set(HiveMetaStoreBridge.DB, dbReferenceable);
                dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
                break;

            default:
                // custom node - create a base dataset class with name attribute
                //TODO - What should we do for custom data sets. Not sure what name we can set here?
                return null;
        }
        dependentEntities.add(dataSetReferenceable);


        return dataSetReferenceable;
    }