in metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java [178:237]
public void execute(FileSystem fs, String... argv) throws Exception {
CommandLine cli = ModelSubmissionOptions.parse(new PosixParser(), argv);
if(ModelSubmissionOptions.LOG4J_PROPERTIES.has(cli)) {
Log4jPropertyHelper.updateLog4jConfiguration(ModelSubmission.class, ModelSubmissionOptions.LOG4J_PROPERTIES.get(cli));
}
ModelRequest request = null;
CuratorFramework client = null;
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.newClient(ModelSubmissionOptions.ZK_QUORUM.get(cli), retryPolicy);
client.start();
MaaSConfig config = ConfigUtil.INSTANCE.read(client, ModelSubmissionOptions.ZK_ROOT.get(cli, "/metron/maas/config"), new MaaSConfig(), MaaSConfig.class);
String mode = ModelSubmissionOptions.MODE.get(cli);
if ( mode.equalsIgnoreCase("ADD")) {
request = new ModelRequest() {{
setName(ModelSubmissionOptions.NAME.get(cli));
setAction(Action.ADD);
setVersion(ModelSubmissionOptions.VERSION.get(cli));
setNumInstances(Integer.parseInt(ModelSubmissionOptions.NUM_INSTANCES.get(cli)));
setMemory(Integer.parseInt(ModelSubmissionOptions.MEMORY.get(cli)));
setPath(ModelSubmissionOptions.HDFS_MODEL_PATH.get(cli));
}};
} else if(mode.equalsIgnoreCase("REMOVE")) {
request = new ModelRequest() {{
setName(ModelSubmissionOptions.NAME.get(cli));
setAction(Action.REMOVE);
setNumInstances(Integer.parseInt(ModelSubmissionOptions.NUM_INSTANCES.get(cli)));
setVersion(ModelSubmissionOptions.VERSION.get(cli));
}};
}
else if(mode.equalsIgnoreCase("LIST")) {
String name = ModelSubmissionOptions.NAME.get(cli, null);
String version = ModelSubmissionOptions.VERSION.get(cli, null);
ServiceDiscoverer serviceDiscoverer = new ServiceDiscoverer(client, config.getServiceRoot());
Model model = new Model(name, version);
Map<Model, List<ModelEndpoint>> endpoints = serviceDiscoverer.listEndpoints(model);
for(Map.Entry<Model, List<ModelEndpoint>> kv : endpoints.entrySet()) {
String modelTitle = "Model " + kv.getKey().getName() + " @ " + kv.getKey().getVersion();
System.out.println(modelTitle);
for(ModelEndpoint endpoint : kv.getValue()){
System.out.println(endpoint);
}
}
}
if (ModelSubmissionOptions.LOCAL_MODEL_PATH.has(cli)) {
File localDir = new File(ModelSubmissionOptions.LOCAL_MODEL_PATH.get(cli));
Path hdfsPath = new Path(ModelSubmissionOptions.HDFS_MODEL_PATH.get(cli));
updateHDFS(fs, localDir, hdfsPath);
}
Queue<ModelRequest> queue = config.createQueue(ImmutableMap.of(ZKQueue.ZK_CLIENT, client));
queue.enqueue(request);
} finally {
if (client != null) {
client.close();
}
}
}