in samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java [55:123]
public static void main(String[] args) throws IOException {
Properties props = StormSamoaUtils.getProperties();
String uploadedJarLocation = props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
if (uploadedJarLocation == null) {
logger.error("Invalid properties file. It must have key {}",
StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
return;
}
List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
int numWorkers = StormSamoaUtils.numWorkers(tmpArgs);
args = tmpArgs.toArray(new String[0]);
StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
Config conf = new Config();
conf.putAll(Utils.readStormConfig());
conf.putAll(Utils.readCommandLineOpts());
conf.setDebug(false);
conf.setNumWorkers(numWorkers);
String profilerOption =
props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY);
if (profilerOption != null) {
String topoWorkerChildOpts = (String) conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
StringBuilder optionBuilder = new StringBuilder();
if (topoWorkerChildOpts != null) {
optionBuilder.append(topoWorkerChildOpts);
optionBuilder.append(' ');
}
optionBuilder.append(profilerOption);
conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString());
}
Map<String, Object> myConfigMap = new HashMap<String, Object>(conf);
StringWriter out = new StringWriter();
try {
JSONValue.writeJSONString(myConfigMap, out);
} catch (IOException e) {
System.out.println("Error in writing JSONString");
e.printStackTrace();
return;
}
Config config = new Config();
config.putAll(Utils.readStormConfig());
NimbusClient nc = NimbusClient.getConfiguredClient(config);
String topologyName = stormTopo.getTopologyName();
try {
System.out.println("Submitting topology with name: "
+ topologyName);
nc.getClient().submitTopology(topologyName, uploadedJarLocation,
out.toString(), stormTopo.getStormBuilder().createTopology());
System.out.println(topologyName + " is successfully submitted");
} catch (AlreadyAliveException aae) {
System.out.println("Fail to submit " + topologyName
+ "\nError message: " + aae.get_msg());
} catch (InvalidTopologyException ite) {
System.out.println("Invalid topology for " + topologyName);
ite.printStackTrace();
} catch (TException te) {
System.out.println("Texception for " + topologyName);
te.printStackTrace();
}
}