in evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java [2073:2307]
private <T> Map<String, T> getBulk(final Collection<String> keys, Transcoder<T> tc, boolean touch, int timeToLive) throws EVCacheException {
if (null == keys) throw new IllegalArgumentException();
if (keys.isEmpty()) return Collections.<String, T> emptyMap();
checkTTL(timeToLive, Call.BULK);
final boolean throwExc = doThrowException();
final EVCacheClient client = _pool.getEVCacheClientForRead();
if (client == null) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.BULK);
if (throwExc) throw new EVCacheException("Could not find a client to get the data in bulk");
return Collections.<String, T> emptyMap();// Fast failure
}
final Map<String, T> decanonicalR = new HashMap<String, T>((keys.size() * 4) / 3 + 1);
final Collection<EVCacheKey> evcKeys = new ArrayList<EVCacheKey>();
/* Canonicalize keys and perform fast failure checking */
for (String k : keys) {
final EVCacheKey evcKey = getEVCacheKey(k);
T value = null;
if (_useInMemoryCache.get()) {
try {
final Transcoder<T> transcoder = (tc == null) ? ((_transcoder == null) ? (Transcoder<T>) _pool.getEVCacheClientForRead().getTranscoder() : (Transcoder<T>) _transcoder) : tc;
value = (T) getInMemoryCache(transcoder).get(evcKey);
if(value == null) if (log.isInfoEnabled() && shouldLog()) log.info("Value not_found in inmemory cache for APP " + _appName + ", key : " + evcKey + "; value : " + value );
} catch (ExecutionException e) {
if (log.isDebugEnabled() && shouldLog()) log.debug("ExecutionException while getting data from InMemory Cache", e);
throw new EVCacheException("ExecutionException", e);
}
}
if(value == null) {
evcKeys.add(evcKey);
} else {
decanonicalR.put(evcKey.getKey(), value);
if (log.isDebugEnabled() && shouldLog()) log.debug("Value retrieved from inmemory cache for APP " + _appName + ", key : " + evcKey + (log.isTraceEnabled() ? "; value : " + value : ""));
}
}
if(evcKeys.size() == 0 && decanonicalR.size() == keys.size()) {
if (log.isDebugEnabled() && shouldLog()) log.debug("All Values retrieved from inmemory cache for APP " + _appName + ", keys : " + keys + (log.isTraceEnabled() ? "; value : " + decanonicalR : ""));
return decanonicalR;
}
final EVCacheEvent event = createEVCacheEvent(Collections.singletonList(client), Call.BULK);
if (event != null) {
event.setEVCacheKeys(evcKeys);
try {
if (shouldThrottle(event)) {
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.BULK);
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & keys " + keys);
return Collections.<String, T> emptyMap();
}
} catch(EVCacheException ex) {
if(throwExc) throw ex;
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.BULK);
return null;
}
event.setTTL(timeToLive);
startEvent(event);
}
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
String cacheOperation = EVCacheMetricsFactory.YES;
int tries = 1;
String status = EVCacheMetricsFactory.SUCCESS;
try {
final boolean hasZF = hasZoneFallbackForBulk();
boolean throwEx = hasZF ? false : throwExc;
Map<EVCacheKey, T> retMap = getBulkData(client, evcKeys, tc, throwEx, hasZF);
List<EVCacheClient> fbClients = null;
if (hasZF) {
if (retMap == null || retMap.isEmpty()) {
fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());
if (fbClients != null && !fbClients.isEmpty()) {
for (int i = 0; i < fbClients.size(); i++) {
final EVCacheClient fbClient = fbClients.get(i);
if(i >= fbClients.size() - 1) throwEx = throwExc;
if (event != null) {
try {
if (shouldThrottle(event)) {
status = EVCacheMetricsFactory.THROTTLED;
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKeys);
return null;
}
} catch(EVCacheException ex) {
if(throwExc) throw ex;
status = EVCacheMetricsFactory.THROTTLED;
return null;
}
}
tries++;
retMap = getBulkData(fbClient, evcKeys, tc, throwEx, (i < fbClients.size() - 1) ? true : false);
if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + evcKeys + (log.isTraceEnabled() ? "], Value [" + retMap : "") + "], zone : " + fbClient.getZone());
if (retMap != null && !retMap.isEmpty()) break;
}
//increment("BULK-FULL_RETRY-" + ((retMap == null || retMap.isEmpty()) ? "MISS" : "HIT"));
}
} else if (retMap != null && keys.size() > retMap.size() && _bulkPartialZoneFallbackFP.get()) {
final int initRetrySize = keys.size() - retMap.size();
List<EVCacheKey> retryEVCacheKeys = new ArrayList<EVCacheKey>(initRetrySize);
for (Iterator<EVCacheKey> keysItr = evcKeys.iterator(); keysItr.hasNext();) {
final EVCacheKey key = keysItr.next();
if (!retMap.containsKey(key)) {
retryEVCacheKeys.add(key);
}
}
fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());
if (fbClients != null && !fbClients.isEmpty()) {
for (int ind = 0; ind < fbClients.size(); ind++) {
final EVCacheClient fbClient = fbClients.get(ind);
if (event != null) {
try {
if (shouldThrottle(event)) {
status = EVCacheMetricsFactory.THROTTLED;
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & keys " + retryEVCacheKeys);
return null;
}
} catch(EVCacheException ex) {
status = EVCacheMetricsFactory.THROTTLED;
if(throwExc) throw ex;
return null;
}
}
tries++;
final Map<EVCacheKey, T> fbRetMap = getBulkData(fbClient, retryEVCacheKeys, tc, false, hasZF);
if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + retryEVCacheKeys + "], Fallback Server Group : " + fbClient .getServerGroup().getName());
for (Map.Entry<EVCacheKey, T> i : fbRetMap.entrySet()) {
retMap.put(i.getKey(), i.getValue());
if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + i.getKey() + (log.isTraceEnabled() ? "], Value [" + i.getValue(): "]"));
}
if (retryEVCacheKeys.size() == fbRetMap.size()) break;
if (ind < fbClients.size()) {
retryEVCacheKeys = new ArrayList<EVCacheKey>(keys.size() - retMap.size());
for (Iterator<EVCacheKey> keysItr = evcKeys.iterator(); keysItr.hasNext();) {
final EVCacheKey key = keysItr.next();
if (!retMap.containsKey(key)) {
retryEVCacheKeys.add(key);
}
}
}
}
}
if (log.isDebugEnabled() && shouldLog() && retMap.size() == keys.size()) log.debug("Fallback SUCCESS for APP " + _appName + ", retMap [" + retMap + "]");
}
}
if(decanonicalR.isEmpty()) {
if (retMap == null || retMap.isEmpty()) {
if (log.isInfoEnabled() && shouldLog()) log.info("BULK : APP " + _appName + " ; Full cache miss for keys : " + keys);
if (event != null) event.setAttribute("status", "BMISS_ALL");
final Map<String, T> returnMap = new HashMap<String, T>();
if (retMap != null && retMap.isEmpty()) {
for (String k : keys) {
returnMap.put(k, null);
}
}
//increment("BulkMissFull");
cacheOperation = EVCacheMetricsFactory.NO;
/* If both Retry and first request fail Exit Immediately. */
if (event != null) endEvent(event);
return returnMap;
}
}
/* Decanonicalize the keys */
boolean partialHit = false;
final List<String> decanonicalHitKeys = new ArrayList<String>(retMap.size());
for (Iterator<EVCacheKey> itr = evcKeys.iterator(); itr.hasNext();) {
final EVCacheKey key = itr.next();
final String deCanKey = key.getKey();
final T value = retMap.get(key);
if (value != null) {
decanonicalR.put(deCanKey, value);
if (touch) touchData(key, timeToLive);
decanonicalHitKeys.add(deCanKey);
} else {
partialHit = true;
// this ensures the fallback was tried
decanonicalR.put(deCanKey, null);
}
}
if (!decanonicalR.isEmpty()) {
if (!partialHit) {
if (event != null) event.setAttribute("status", "BHIT");
} else {
if (event != null) {
event.setAttribute("status", "BHIT_PARTIAL");
event.setAttribute("BHIT_PARTIAL_KEYS", decanonicalHitKeys);
}
//increment("BulkHitPartial");
cacheOperation = EVCacheMetricsFactory.PARTIAL;
if (log.isInfoEnabled() && shouldLog()) log.info("BULK_HIT_PARTIAL for APP " + _appName + ", keys in cache [" + decanonicalR + "], all keys [" + keys + "]");
}
}
if (log.isDebugEnabled() && shouldLog()) log.debug("BulkGet; APP " + _appName + ", keys : " + keys + (log.isTraceEnabled() ? "; value : " + decanonicalR : ""));
if (event != null) endEvent(event);
return decanonicalR;
} catch (net.spy.memcached.internal.CheckedOperationTimeoutException ex) {
status = EVCacheMetricsFactory.TIMEOUT;
if (log.isDebugEnabled() && shouldLog()) log.debug("CheckedOperationTimeoutException getting bulk data for APP " + _appName + ", keys : " + evcKeys, ex);
if (event != null) {
event.setStatus(status);
eventError(event, ex);
}
if (!throwExc) return null;
throw new EVCacheException("CheckedOperationTimeoutException getting bulk data for APP " + _appName + ", keys = " + evcKeys
+ ".\nYou can set the following property to increase the timeout " + _appName + ".EVCacheClientPool.bulkReadTimeout=<timeout in milli-seconds>", ex);
} catch (Exception ex) {
status = EVCacheMetricsFactory.ERROR;
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception getting bulk data for APP " + _appName + ", keys = " + evcKeys, ex);
if (event != null) {
event.setStatus(status);
eventError(event, ex);
}
if (!throwExc) return null;
throw new EVCacheException("Exception getting bulk data for APP " + _appName + ", keys = " + evcKeys, ex);
} finally {
final long duration = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime()- start;
if(bulkKeysSize == null) {
final List<Tag> tagList = new ArrayList<Tag>(4);
tagList.addAll(tags);
tagList.add(new BasicTag(EVCacheMetricsFactory.CALL_TAG, EVCacheMetricsFactory.BULK_OPERATION));
tagList.add(new BasicTag(EVCacheMetricsFactory.CALL_TYPE_TAG, EVCacheMetricsFactory.READ));
// if(status != null) tagList.add(new BasicTag(EVCacheMetricsFactory.STATUS, status));
// if(tries >= 0) tagList.add(new BasicTag(EVCacheMetricsFactory.ATTEMPT, String.valueOf(tries)));
bulkKeysSize = EVCacheMetricsFactory.getInstance().getDistributionSummary(EVCacheMetricsFactory.OVERALL_KEYS_SIZE, tagList);
}
bulkKeysSize.record(keys.size());
getTimer(Call.BULK.name(), EVCacheMetricsFactory.READ, cacheOperation, status, tries, maxReadDuration.get().intValue(), client.getServerGroup()).record(duration, TimeUnit.MILLISECONDS);
if (log.isDebugEnabled() && shouldLog()) log.debug("BULK : APP " + _appName + " Took " + duration + " milliSec to get the value for key " + evcKeys);
}
}