private CAS processUntilNextOutputCas()

in uimaj-core/src/main/java/org/apache/uima/analysis_engine/asb/impl/ASB_impl.java [512:726]


    private CAS processUntilNextOutputCas() throws AnalysisEngineProcessException {
      FlowContainer flow = null;
      try {
        while (true) {
          CAS cas = null;
          Step nextStep = null;
          flow = null;
          // get an initial CAS from the CasIteratorStack
          while (cas == null) {
            if (casIteratorStack.isEmpty()) {
              return null; // there are no more CAS Iterators to obtain CASes from
            }
            StackFrame frame = casIteratorStack.peek();
            try {
              if (frame.casIterator.hasNext()) {
                cas = frame.casIterator.next();
                // this is a new output CAS so we need to compute a flow for it
                flow = frame.originalCasFlow.newCasProduced(cas, frame.casMultiplierAeKey);
              }
            } catch (Exception e) {
              // A CAS Multiplier (or possibly an aggregate) threw an exception trying to output the
              // next CAS.
              // We abandon trying to get further output CASes from that CAS Multiplier,
              // and ask the Flow Controller if we should continue routing the CAS that was input to
              // the CasMultiplier.
              if (!frame.originalCasFlow.continueOnFailure(frame.casMultiplierAeKey, e)) {
                throw e;
              } else {
                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                        "processUntilNextOutputCas", LOG_RESOURCE_BUNDLE,
                        "UIMA_continuing_after_exception__FINE", e);

              }
              // if the Flow says to continue, we fall through to the if (cas == null) block below,
              // get
              // the originalCas from the stack and continue with its flow.
            }
            if (cas == null) {
              // we've finished routing all the Output CASes from a StackFrame. Now
              // get the originalCas (the one that was input to the CasMultiplier) from
              // that stack frame and continue with its flow
              cas = frame.originalCas;
              flow = frame.originalCasFlow;
              nextStep = frame.incompleteParallelStep; // in case we need to resume a parallel step
              cas.setCurrentComponentInfo(null); // this CAS is done being processed by the previous
                                                 // AnalysisComponent
              casIteratorStack.pop(); // remove this state from the stack now
            }
          }

          // record active CASes in case we encounter an exception and need to release them
          activeCASes.add(cas);

          // if we're not in the middle of parallel step already, ask the FlowController
          // for the next step
          if (nextStep == null) {
            nextStep = flow.next();
          }

          // repeat until we reach a FinalStep
          while (!(nextStep instanceof FinalStep)) {
            // Simple Step
            if (nextStep instanceof SimpleStep simpleStep) {
              String nextAeKey = simpleStep.getAnalysisEngineKey();
              AnalysisEngine nextAe = mComponentAnalysisEngineMap.get(nextAeKey);
              if (nextAe != null) {
                // check if we have to set result spec, to support capability language flow
                if (nextStep instanceof SimpleStepWithResultSpec) {
                  ResultSpecification rs = ((SimpleStepWithResultSpec) nextStep)
                          .getResultSpecification();
                  if (rs != null) {
                    nextAe.setResultSpecification(rs);
                  }
                }
                // invoke next AE in flow
                CasIterator casIter = null;
                CAS outputCas = null; // used if the AE we call outputs a new CAS
                try {
                  casIter = nextAe.processAndOutputNewCASes(cas);
                  if (casIter.hasNext()) {
                    outputCas = casIter.next();
                  }
                } catch (Exception e) {
                  // ask the FlowController if we should continue
                  // TODO: should this be configurable?
                  if (!flow.continueOnFailure(nextAeKey, e)) {
                    throw e;
                  } else {
                    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                            "processUntilNextOutputCas", LOG_RESOURCE_BUNDLE,
                            "UIMA_continuing_after_exception__FINE", e);
                  }
                }
                if (outputCas != null) // new CASes are output
                {
                  // push the CasIterator, original CAS, and Flow onto a stack so we
                  // can get the other output CASes and the original CAS later
                  casIteratorStack.push(new StackFrame(casIter, cas, flow, nextAeKey));
                  // compute Flow for the output CAS
                  flow = flow.newCasProduced(outputCas, nextAeKey);
                  // now route the output CAS through the flow
                  cas = outputCas;
                  activeCASes.add(cas);
                } else {
                  // no new CASes are output; this cas is done being processed
                  // by that AnalysisEngine so clear the componentInfo
                  cas.setCurrentComponentInfo(null);
                }
              } else {
                throw new AnalysisEngineProcessException(
                        AnalysisEngineProcessException.UNKNOWN_ID_IN_SEQUENCE,
                        new Object[] { nextAeKey });
              }
            }
            // ParallelStep (TODO: refactor out common parts with SimpleStep?)
            else if (nextStep instanceof ParallelStep) {
              // create modifiable list of destinations
              List<String> destinations = new LinkedList<>(
                      ((ParallelStep) nextStep).getAnalysisEngineKeys());
              // iterate over all destinations, removing them from the list as we go
              while (!destinations.isEmpty()) {
                String nextAeKey = destinations.get(0);
                destinations.remove(0);
                // execute this step as we would a single step
                AnalysisEngine nextAe = mComponentAnalysisEngineMap.get(nextAeKey);
                if (nextAe != null) {
                  // invoke next AE in flow
                  CasIterator casIter = null;
                  CAS outputCas = null; // used if the AE we call outputs a new CAS
                  try {
                    casIter = nextAe.processAndOutputNewCASes(cas);
                    if (casIter.hasNext()) {
                      outputCas = casIter.next();
                    }
                  } catch (Exception e) {
                    // ask the FlowController if we should continue
                    // TODO: should this be configurable?
                    if (!flow.continueOnFailure(nextAeKey, e)) {
                      throw e;
                    } else {
                      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                              "processUntilNextOutputCas", LOG_RESOURCE_BUNDLE,
                              "UIMA_continuing_after_exception__FINE", e);
                    }
                  }
                  if (outputCas != null) // new CASes are output
                  {
                    // when pushing the stack frame so we know where to pick up later,
                    // be sure to include the incomplete ParallelStep
                    if (!destinations.isEmpty()) {
                      casIteratorStack.push(new StackFrame(casIter, cas, flow, nextAeKey,
                              new ParallelStep(destinations)));
                    } else {
                      casIteratorStack.push(new StackFrame(casIter, cas, flow, nextAeKey));
                    }

                    // compute Flow for the output CAS and begin routing it through the flow
                    flow = flow.newCasProduced(outputCas, nextAeKey);
                    cas = outputCas;
                    activeCASes.add(cas);
                    break; // break out of processing of ParallelStep
                  } else {
                    // no new CASes are output; this cas is done being processed
                    // by that AnalysisEngine so clear the componentInfo
                    cas.setCurrentComponentInfo(null);
                  }
                } else {
                  throw new AnalysisEngineProcessException(
                          AnalysisEngineProcessException.UNKNOWN_ID_IN_SEQUENCE,
                          new Object[] { nextAeKey });
                }
              }
            } else {
              throw new AnalysisEngineProcessException(
                      AnalysisEngineProcessException.UNSUPPORTED_STEP_TYPE,
                      new Object[] { nextStep.getClass() });
            }
            nextStep = flow.next();
          }
          // FinalStep was returned from FlowController.
          // We're done with the CAS.
          assert (nextStep instanceof FinalStep);
          FinalStep finalStep = (FinalStep) nextStep;
          activeCASes.remove(cas);
          // If this is the input CAS, just return null to indicate we're done
          // processing it. It is an error if the FlowController tried to drop this CAS.
          if (cas == mInputCas) {
            if (finalStep.getForceCasToBeDropped()) {
              throw new AnalysisEngineProcessException(
                      AnalysisEngineProcessException.ILLEGAL_DROP_CAS, new Object[0]);
            }
            return null;
          }
          // Otherwise, this is a new CAS produced within this Aggregate. We may or
          // may not return it, depending on the setting of the outputsNewCASes operational
          // property in this AE's metadata, and on the value of FinalStep.forceCasToBeDropped
          if (mOutputNewCASes && !finalStep.getForceCasToBeDropped()) {
            return cas;
          } else {
            cas.release();
          }
        }
      } catch (Exception e) {
        // notify Flow that processing has aborted on this CAS
        if (flow != null) {
          flow.aborted();
        }
        release(); // release held CASes before throwing exception
        if (e instanceof AnalysisEngineProcessException analysisEngineProcessException) {
          throw analysisEngineProcessException;
        } else {
          throw new AnalysisEngineProcessException(e);
        }
      }
    }