in modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java [255:381]
private boolean processRequestMessage(MessageContext synCtx,
SynapseLog synLog, CacheManager cacheManager) throws ClusteringFault {
if (collector) {
handleException("Request messages cannot be handled in a collector cache", synCtx);
}
OperationContext opCtx = ((Axis2MessageContext)synCtx).getAxis2MessageContext().
getOperationContext();
String requestHash = null;
try {
requestHash = digestGenerator.getDigest(
((Axis2MessageContext) synCtx).getAxis2MessageContext());
synCtx.setProperty(CachingConstants.REQUEST_HASH, requestHash);
} catch (CachingException e) {
handleException("Error in calculating the hash value of the request", e, synCtx);
}
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Generated request hash : " + requestHash);
}
ServiceName service;
if (id != null) {
service = new ServiceName(id);
} else {
service = new ServiceName(cacheKey);
}
RequestHash hash = new RequestHash(requestHash);
CachableResponse cachedResponse =
cacheManager.getCachedResponse(service, hash);
org.apache.axis2.context.MessageContext msgCtx = ((Axis2MessageContext)synCtx).getAxis2MessageContext();
opCtx.setNonReplicableProperty(CachingConstants.REQUEST_HASH, requestHash);
CacheReplicationCommand cacheReplicationCommand = new CacheReplicationCommand();
if (cachedResponse != null) {
// get the response from the cache and attach to the context and change the
// direction of the message
if (!cachedResponse.isExpired()) {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Cache-hit for message ID : " + synCtx.getMessageID());
}
cachedResponse.setInUse(true);
// mark as a response and replace envelope from cache
synCtx.setResponse(true);
opCtx.setNonReplicableProperty(CachingConstants.CACHED_OBJECT, cachedResponse);
SOAPEnvelope omSOAPEnv;
try {
omSOAPEnv = SOAPMessageHelper.buildSOAPEnvelopeFromBytes(
cachedResponse.getResponseEnvelope());
if (omSOAPEnv != null) {
synCtx.setEnvelope(omSOAPEnv);
}
} catch (AxisFault axisFault) {
handleException("Error setting response envelope from cache : "
+ cacheKey, synCtx);
} catch (IOException ioe) {
handleException("Error setting response envelope from cache : "
+ cacheKey, ioe, synCtx);
} catch (SOAPException soape) {
handleException("Error setting response envelope from cache : "
+ cacheKey, soape, synCtx);
}
// take specified action on cache hit
if (onCacheHitSequence != null) {
// if there is an onCacheHit use that for the mediation
synLog.traceOrDebug("Delegating message to the onCachingHit "
+ "Anonymous sequence");
onCacheHitSequence.mediate(synCtx);
} else if (onCacheHitRef != null) {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Delegating message to the onCachingHit " +
"sequence : " + onCacheHitRef);
}
synCtx.getSequence(onCacheHitRef).mediate(synCtx);
} else {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Request message " + synCtx.getMessageID() +
" was served from the cache : " + cacheKey);
}
// send the response back if there is not onCacheHit is specified
synCtx.setTo(null);
Axis2Sender.sendBack(synCtx);
}
// stop any following mediators from executing
return false;
} else {
cachedResponse.reincarnate(timeout);
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Existing cached response has expired. Reset cache element");
}
cacheManager.cacheResponse(service, hash, cachedResponse, cacheReplicationCommand);
opCtx.setNonReplicableProperty(CachingConstants.CACHED_OBJECT, cachedResponse);
opCtx.setNonReplicableProperty(CachingConstants.STATE_REPLICATION_OBJECT,
cacheReplicationCommand);
Replicator.replicate(opCtx);
}
} else {
// if not found in cache, check if we can cache this request
if (cacheManager.getCacheSize(service) >= inMemoryCacheSize) { // If cache is full
cacheManager.removeExpiredResponses(service, cacheReplicationCommand); // try to remove expired responses
if (cacheManager.getCacheSize(service) >= inMemoryCacheSize) { // recheck if there is space
if (log.isDebugEnabled()) {
log.debug("In-memory cache is full. Unable to cache");
}
} else { // if we managed to free up some space in the cache. Need state replication
cacheNewResponse(msgCtx, service, hash, cacheManager,
cacheReplicationCommand);
}
} else { // if there is more space in the cache. Need state replication
cacheNewResponse(msgCtx, service, hash, cacheManager,
cacheReplicationCommand);
}
}
return true;
}