public boolean mediate()

in modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java [125:285]


    public boolean mediate(MessageContext synCtx) {

        SynapseLog synLog = getLog(synCtx);

        if (synLog.isTraceOrDebugEnabled()) {
            synLog.traceOrDebug("Start : Aggregate mediator");

            if (synLog.isTraceTraceEnabled()) {
                synLog.traceTrace("Message : " + synCtx.getEnvelope());
            }
        }

        try {
            Aggregate aggregate = null;
            String correlationIdName = (id != null ? EIPConstants.AGGREGATE_CORRELATION + "." + id :
                    EIPConstants.AGGREGATE_CORRELATION);
            // if a correlateExpression is provided and there is a corresponding
            // element in the current message prepare to correlate the messages on that
            if (correlateExpression != null
                    && correlateExpression.evaluate(synCtx) != null) {

                while (aggregate == null) {

                    synchronized (lock) {

                        if (activeAggregates.containsKey(correlateExpression.toString())) {

                            aggregate = activeAggregates.get(correlateExpression.toString());
                            if (aggregate != null) {
                                if (!aggregate.getLock()) {
                                    aggregate = null;
                                }
                            }

                        } else {

                            if (synLog.isTraceOrDebugEnabled()) {
                                synLog.traceOrDebug("Creating new Aggregator - " +
                                        (completionTimeoutMillis > 0 ? "expires in : "
                                                + (completionTimeoutMillis / 1000) + "secs" :
                                                "without expiry time"));
                            }

                            aggregate = new Aggregate(
                                    synCtx.getEnvironment(),
                                    correlateExpression.toString(),
                                    completionTimeoutMillis,
                                    minMessagesToComplete,
                                    maxMessagesToComplete, this);

                            if (completionTimeoutMillis > 0) {
                                synCtx.getConfiguration().getSynapseTimer().
                                        schedule(aggregate, completionTimeoutMillis);
                            }
                            aggregate.getLock();
                            activeAggregates.put(correlateExpression.toString(), aggregate);
                        }
                    }
                }

            } else if (synCtx.getProperty(correlationIdName) != null) {
                // if the correlation cannot be found using the correlateExpression then
                // try the default which is through the AGGREGATE_CORRELATION message property
                // which is the unique original message id of a split or iterate operation and
                // which thus can be used to uniquely group messages into aggregates

                Object o = synCtx.getProperty(correlationIdName);
                String correlation;

                if (o != null && o instanceof String) {
                    correlation = (String) o;
                    while (aggregate == null) {
                        synchronized (lock) {
                            if (activeAggregates.containsKey(correlation)) {
                                aggregate = activeAggregates.get(correlation);
                                if (aggregate != null) {
                                    if (!aggregate.getLock()) {
                                        aggregate = null;
                                    }
                                } else {
                                    break;
                                }
                            } else {
                                if (synLog.isTraceOrDebugEnabled()) {
                                    synLog.traceOrDebug("Creating new Aggregator - " +
                                            (completionTimeoutMillis > 0 ? "expires in : "
                                                    + (completionTimeoutMillis / 1000) + "secs" :
                                                    "without expiry time"));
                                }
                        
                                aggregate = new Aggregate(
                                        synCtx.getEnvironment(),
                                        correlation,
                                        completionTimeoutMillis,
                                        minMessagesToComplete,
                                        maxMessagesToComplete, this);

                                if (completionTimeoutMillis > 0) {
                                    synchronized(aggregate) {
                                        if (!aggregate.isCompleted()) {
                                            synCtx.getConfiguration().getSynapseTimer().
                                                schedule(aggregate, completionTimeoutMillis);
                                        }
                                    }
                                }
                                aggregate.getLock();
                                activeAggregates.put(correlation, aggregate);
                            }
                        }
                    }
                    
                } else {
                    synLog.traceOrDebug("Unable to find aggrgation correlation property");
                    return true;
                }
            } else {
                synLog.traceOrDebug("Unable to find aggrgation correlation XPath or property");
                return true;
            }

            // if there is an aggregate continue on aggregation
            if (aggregate != null) {
                boolean collected = aggregate.addMessage(synCtx);
                if (synLog.isTraceOrDebugEnabled()) {
                    if (collected) {
                        synLog.traceOrDebug("Collected a message during aggregation");
                        if (synLog.isTraceTraceEnabled()) {
                            synLog.traceTrace("Collected message : " + synCtx);
                        }
                    }
                }
                
                // check the completeness of the aggregate and if completed aggregate the messages
                // if not completed return false and block the message sequence till it completes

                if (aggregate.isComplete(synLog)) {
                    synLog.traceOrDebug("Aggregation completed - invoking onComplete");
                    completeAggregate(aggregate);
                    
                    synLog.traceOrDebug("End : Aggregate mediator");
                    return true;
                } else {
                    aggregate.releaseLock();
                }

            } else {
                // if the aggregation correlation cannot be found then continue the message on the
                // normal path by returning true

                synLog.traceOrDebug("Unable to find an aggregate for this message - skip");
                return true;
            }

        } catch (JaxenException e) {
            handleException("Unable to execute the XPATH over the message", e, synCtx);
        }

        synLog.traceOrDebug("End : Aggregate mediator");

        return true;
    }