in uima-bsp/src/main/java/org/apache/uima/bsp/BasicAEProcessingBSPJob.java [60:151]
public void bsp(BSPPeer<KI, VI, KO, VO, BSPMessage> bspPeer) throws IOException, SyncException, InterruptedException {
try {
Configuration configuration = bspPeer.getConfiguration();
// first superstep
// AE instantiation
String aePath = configuration.get("uima.ae.path");
AnalysisEngineDescription analysisEngineDescription = UIMAFramework.getXMLParser().parseAnalysisEngineDescription(new XMLInputSource(aePath));
AnalysisEngine analysisEngine = UIMAFramework.produceAnalysisEngine(analysisEngineDescription);
// AE initialization
try {
analysisEngine.initialize(analysisEngineDescription, new HashMap<String, Object>());
} catch (Exception e) {
// do nothing
}
Integer casPoolSize = Integer.valueOf(bspPeer.getConfiguration().get("cas.pool.size"));
casPool = new CasPool(casPoolSize, analysisEngine);
// collection distribution
if (isMaster(bspPeer)) {
String dirPath = configuration.get("collection.path");
for (File f : new File(dirPath).listFiles(new FileFilter() {
@Override
public boolean accept(File file) {
return file.getAbsolutePath().endsWith(".txt");
}
})) {
FileReader fileReader = new FileReader(f);
byte[] tag = new byte[2];
r.nextBytes(tag);
ByteMessage byteMessage = new ByteMessage(tag, fileReader.toString().getBytes("UTF-8"));
fileReader.close();
String toPeer = bspPeer.getAllPeerNames()[r.nextInt(bspPeer.getAllPeerNames().length)];
bspPeer.send(toPeer, byteMessage);
}
}
bspPeer.sync();
// second superstep
// receive files to analyze
ByteMessage currentMessage;
while ((currentMessage = (ByteMessage) bspPeer.getCurrentMessage()) != null) {
// get a CAS
CAS cas = casPool.getCas();
// populate with received text
cas.setDocumentText(new String(currentMessage.getData()));
// AE execution
ProcessTrace pt = analysisEngine.process(cas);
// release CAS
casPool.releaseCas(cas);
// send results to the (master) collector
byte[] tag = new byte[2];
r.nextBytes(tag);
String message = bspPeer.getPeerName() + "\n" + pt.toString();
bspPeer.send(master, new ByteMessage(tag, message.getBytes("UTF-8")));
}
bspPeer.sync();
// third superstep
// collect analysis results
if (isMaster(bspPeer)) {
StringBuilder stringBuilder = new StringBuilder();
ByteMessage bspMessage;
while ((bspMessage = (ByteMessage) bspPeer.getCurrentMessage()) != null) {
stringBuilder.append(new String(bspMessage.getData()));
}
File f = new File(configuration.get("output.file"));
f.createNewFile();
FileOutputStream fileOutputStream = new FileOutputStream(f);
fileOutputStream.write(stringBuilder.toString().getBytes("UTF-8"));
fileOutputStream.flush();
fileOutputStream.close();
}
// destroy the used AE
try {
analysisEngine.destroy();
} catch (Exception e) {
// do nothing
}
} catch (InvalidXMLException e) {
throw new InterruptedException(e.getLocalizedMessage());
} catch (AnalysisEngineProcessException e) {
throw new InterruptedException(e.getLocalizedMessage());
} catch (ResourceInitializationException e) {
throw new InterruptedException(e.getLocalizedMessage());
}
}