in modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java [125:285]
public boolean mediate(MessageContext synCtx) {
SynapseLog synLog = getLog(synCtx);
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Start : Aggregate mediator");
if (synLog.isTraceTraceEnabled()) {
synLog.traceTrace("Message : " + synCtx.getEnvelope());
}
}
try {
Aggregate aggregate = null;
String correlationIdName = (id != null ? EIPConstants.AGGREGATE_CORRELATION + "." + id :
EIPConstants.AGGREGATE_CORRELATION);
// if a correlateExpression is provided and there is a corresponding
// element in the current message prepare to correlate the messages on that
if (correlateExpression != null
&& correlateExpression.evaluate(synCtx) != null) {
while (aggregate == null) {
synchronized (lock) {
if (activeAggregates.containsKey(correlateExpression.toString())) {
aggregate = activeAggregates.get(correlateExpression.toString());
if (aggregate != null) {
if (!aggregate.getLock()) {
aggregate = null;
}
}
} else {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Creating new Aggregator - " +
(completionTimeoutMillis > 0 ? "expires in : "
+ (completionTimeoutMillis / 1000) + "secs" :
"without expiry time"));
}
aggregate = new Aggregate(
synCtx.getEnvironment(),
correlateExpression.toString(),
completionTimeoutMillis,
minMessagesToComplete,
maxMessagesToComplete, this);
if (completionTimeoutMillis > 0) {
synCtx.getConfiguration().getSynapseTimer().
schedule(aggregate, completionTimeoutMillis);
}
aggregate.getLock();
activeAggregates.put(correlateExpression.toString(), aggregate);
}
}
}
} else if (synCtx.getProperty(correlationIdName) != null) {
// if the correlation cannot be found using the correlateExpression then
// try the default which is through the AGGREGATE_CORRELATION message property
// which is the unique original message id of a split or iterate operation and
// which thus can be used to uniquely group messages into aggregates
Object o = synCtx.getProperty(correlationIdName);
String correlation;
if (o != null && o instanceof String) {
correlation = (String) o;
while (aggregate == null) {
synchronized (lock) {
if (activeAggregates.containsKey(correlation)) {
aggregate = activeAggregates.get(correlation);
if (aggregate != null) {
if (!aggregate.getLock()) {
aggregate = null;
}
} else {
break;
}
} else {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Creating new Aggregator - " +
(completionTimeoutMillis > 0 ? "expires in : "
+ (completionTimeoutMillis / 1000) + "secs" :
"without expiry time"));
}
aggregate = new Aggregate(
synCtx.getEnvironment(),
correlation,
completionTimeoutMillis,
minMessagesToComplete,
maxMessagesToComplete, this);
if (completionTimeoutMillis > 0) {
synchronized(aggregate) {
if (!aggregate.isCompleted()) {
synCtx.getConfiguration().getSynapseTimer().
schedule(aggregate, completionTimeoutMillis);
}
}
}
aggregate.getLock();
activeAggregates.put(correlation, aggregate);
}
}
}
} else {
synLog.traceOrDebug("Unable to find aggrgation correlation property");
return true;
}
} else {
synLog.traceOrDebug("Unable to find aggrgation correlation XPath or property");
return true;
}
// if there is an aggregate continue on aggregation
if (aggregate != null) {
boolean collected = aggregate.addMessage(synCtx);
if (synLog.isTraceOrDebugEnabled()) {
if (collected) {
synLog.traceOrDebug("Collected a message during aggregation");
if (synLog.isTraceTraceEnabled()) {
synLog.traceTrace("Collected message : " + synCtx);
}
}
}
// check the completeness of the aggregate and if completed aggregate the messages
// if not completed return false and block the message sequence till it completes
if (aggregate.isComplete(synLog)) {
synLog.traceOrDebug("Aggregation completed - invoking onComplete");
completeAggregate(aggregate);
synLog.traceOrDebug("End : Aggregate mediator");
return true;
} else {
aggregate.releaseLock();
}
} else {
// if the aggregation correlation cannot be found then continue the message on the
// normal path by returning true
synLog.traceOrDebug("Unable to find an aggregate for this message - skip");
return true;
}
} catch (JaxenException e) {
handleException("Unable to execute the XPATH over the message", e, synCtx);
}
synLog.traceOrDebug("End : Aggregate mediator");
return true;
}