public void bsp()

in uima-bsp/src/main/java/org/apache/uima/bsp/BasicAEProcessingBSPJob.java [60:151]


  public void bsp(BSPPeer<KI, VI, KO, VO, BSPMessage> bspPeer) throws IOException, SyncException, InterruptedException {
    try {
      Configuration configuration = bspPeer.getConfiguration();
      // first superstep

      // AE instantiation
      String aePath = configuration.get("uima.ae.path");
      AnalysisEngineDescription analysisEngineDescription = UIMAFramework.getXMLParser().parseAnalysisEngineDescription(new XMLInputSource(aePath));
      AnalysisEngine analysisEngine = UIMAFramework.produceAnalysisEngine(analysisEngineDescription);

      // AE initialization
      try {
        analysisEngine.initialize(analysisEngineDescription, new HashMap<String, Object>());
      } catch (Exception e) {
        // do nothing
      }
      Integer casPoolSize = Integer.valueOf(bspPeer.getConfiguration().get("cas.pool.size"));
      casPool = new CasPool(casPoolSize, analysisEngine);

      // collection distribution
      if (isMaster(bspPeer)) {
        String dirPath = configuration.get("collection.path");
        for (File f : new File(dirPath).listFiles(new FileFilter() {
            @Override
            public boolean accept(File file) {
              return file.getAbsolutePath().endsWith(".txt");
            }
          })) {
          FileReader fileReader = new FileReader(f);
          byte[] tag = new byte[2];
          r.nextBytes(tag);
          ByteMessage byteMessage = new ByteMessage(tag, fileReader.toString().getBytes("UTF-8"));
          fileReader.close();
          String toPeer = bspPeer.getAllPeerNames()[r.nextInt(bspPeer.getAllPeerNames().length)];
          bspPeer.send(toPeer, byteMessage);
        }
      }
      bspPeer.sync();

      // second superstep

      // receive files to analyze
      ByteMessage currentMessage;
      while ((currentMessage = (ByteMessage) bspPeer.getCurrentMessage()) != null) {
        // get a CAS
        CAS cas = casPool.getCas();
        // populate with received text
        cas.setDocumentText(new String(currentMessage.getData()));
        // AE execution
        ProcessTrace pt = analysisEngine.process(cas);
        // release CAS
        casPool.releaseCas(cas);
        // send results to the (master) collector
        byte[] tag = new byte[2];
        r.nextBytes(tag);
        String message = bspPeer.getPeerName() + "\n" + pt.toString();
        bspPeer.send(master, new ByteMessage(tag, message.getBytes("UTF-8")));
      }
      bspPeer.sync();

      // third superstep

      // collect analysis results
      if (isMaster(bspPeer)) {
        StringBuilder stringBuilder = new StringBuilder();
        ByteMessage bspMessage;
        while ((bspMessage = (ByteMessage) bspPeer.getCurrentMessage()) != null) {
          stringBuilder.append(new String(bspMessage.getData()));
        }
        File f = new File(configuration.get("output.file"));
        f.createNewFile();
        FileOutputStream fileOutputStream = new FileOutputStream(f);
        fileOutputStream.write(stringBuilder.toString().getBytes("UTF-8"));
        fileOutputStream.flush();
        fileOutputStream.close();
      }

      // destroy the used AE
      try {
        analysisEngine.destroy();
      } catch (Exception e) {
        // do nothing
      }

    } catch (InvalidXMLException e) {
      throw new InterruptedException(e.getLocalizedMessage());
    } catch (AnalysisEngineProcessException e) {
      throw new InterruptedException(e.getLocalizedMessage());
    } catch (ResourceInitializationException e) {
      throw new InterruptedException(e.getLocalizedMessage());
    }
  }