in addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java [178:255]
private Referenceable createDataSet(String name, String topologyOwner,
Serializable instance,
Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException {
Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null);
String clusterName = null;
Referenceable dataSetReferenceable;
// todo: need to redo this with a config driven approach
switch (name) {
case "KafkaSpout":
dataSetReferenceable = new Referenceable(StormDataTypes.KAFKA_TOPIC.getName());
final String topicName = config.get("KafkaSpout._spoutConfig.topic");
dataSetReferenceable.set("topic", topicName);
dataSetReferenceable.set("uri",
config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr"));
if (StringUtils.isEmpty(topologyOwner)) {
topologyOwner = ANONYMOUS_OWNER;
}
dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(getClusterName(stormConf), topicName));
dataSetReferenceable.set(AtlasClient.NAME, topicName);
break;
case "HBaseBolt":
dataSetReferenceable = new Referenceable(StormDataTypes.HBASE_TABLE.getName());
final String hbaseTableName = config.get("HBaseBolt.tableName");
dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir"));
dataSetReferenceable.set(AtlasClient.NAME, hbaseTableName);
dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf);
//TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
hbaseTableName));
break;
case "HdfsBolt":
dataSetReferenceable = new Referenceable(HiveMetaStoreBridge.HDFS_PATH);
String hdfsUri = config.get("HdfsBolt.rotationActions") == null
? config.get("HdfsBolt.fileNameFormat.path")
: config.get("HdfsBolt.rotationActions");
final String hdfsPathStr = config.get("HdfsBolt.fsUrl") + hdfsUri;
dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr);
dataSetReferenceable.set("path", hdfsPathStr);
dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal"));
final Path hdfsPath = new Path(hdfsPathStr);
dataSetReferenceable.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
break;
case "HiveBolt":
// todo: verify if hive table has everything needed to retrieve existing table
Referenceable dbReferenceable = new Referenceable("hive_db");
String databaseName = config.get("HiveBolt.options.databaseName");
dbReferenceable.set(AtlasClient.NAME, databaseName);
dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName));
dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf));
dependentEntities.add(dbReferenceable);
clusterName = extractComponentClusterName(new HiveConf(), stormConf);
final String hiveTableName = config.get("HiveBolt.options.tableName");
dataSetReferenceable = new Referenceable("hive_table");
final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName,
databaseName, hiveTableName);
dataSetReferenceable.set(AtlasClient.NAME, hiveTableName);
dataSetReferenceable.set(HiveMetaStoreBridge.DB, dbReferenceable);
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
break;
default:
// custom node - create a base dataset class with name attribute
//TODO - What should we do for custom data sets. Not sure what name we can set here?
return null;
}
dependentEntities.add(dataSetReferenceable);
return dataSetReferenceable;
}