in evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java [1140:1213]
public <T> Single<T> get(String key, Transcoder<T> tc, Scheduler scheduler) {
if (null == key) return Single.error(new IllegalArgumentException("Key cannot be null"));
final boolean throwExc = doThrowException();
final EVCacheClient client = _pool.getEVCacheClientForRead();
if (client == null) {
incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.GET);
return Single.error(new EVCacheException("Could not find a client to get the data APP " + _appName));
}
final EVCacheKey evcKey = getEVCacheKey(key);
final EVCacheEvent event = createEVCacheEvent(Collections.singletonList(client), Call.GET);
if (event != null) {
event.setEVCacheKeys(Arrays.asList(evcKey));
if (shouldThrottle(event)) {
incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.GET);
return Single.error(new EVCacheException("Request Throttled for app " + _appName + " & key " + key));
}
startEvent(event);
}
final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
final boolean hasZF = hasZoneFallback();
final boolean throwEx = hasZF ? false : throwExc;
return getData(client, evcKey, tc, throwEx, hasZF, scheduler).flatMap(data -> {
if (data == null && hasZF) {
final List<EVCacheClient> fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());
if (fbClients != null && !fbClients.isEmpty()) {
return Observable.concat(Observable.from(fbClients).map(
fbClient -> getData(fbClients.indexOf(fbClient), fbClients.size(), fbClient, evcKey, tc, throwEx, throwExc, false, scheduler) //TODO : for the last one make sure to pass throwExc
//.doOnSuccess(fbData -> increment("RETRY_" + ((fbData == null) ? "MISS" : "HIT")))
.toObservable()))
.firstOrDefault(null, fbData -> (fbData != null)).toSingle();
}
}
return Single.just(data);
}).map(data -> {
//increment("GetCall");
if (data != null) {
//increment("GetHit");
if (event != null) event.setAttribute("status", "GHIT");
} else {
//increment("GetMiss");
if (event != null) event.setAttribute("status", "GMISS");
if (log.isInfoEnabled() && shouldLog()) log.info("GET : APP " + _appName + " ; cache miss for key : " + evcKey);
}
if (log.isDebugEnabled() && shouldLog()) log.debug("GET : APP " + _appName + ", key [" + evcKey + (log.isTraceEnabled() ? "], Value [" + data : "") + "], ServerGroup : " + client .getServerGroup());
if (event != null) endEvent(event);
return data;
}).onErrorReturn(ex -> {
if (ex instanceof net.spy.memcached.internal.CheckedOperationTimeoutException) {
if (event != null) {
event.setStatus(EVCacheMetricsFactory.TIMEOUT);
eventError(event, ex);
}
if (!throwExc) return null;
throw sneakyThrow(new EVCacheException("CheckedOperationTimeoutException getting data for APP " + _appName + ", key = "
+ evcKey
+ ".\nYou can set the following property to increase the timeout " + _appName
+ ".EVCacheClientPool.readTimeout=<timeout in milli-seconds>", ex));
} else {
if (event != null) {
event.setStatus(EVCacheMetricsFactory.ERROR);
eventError(event, ex);
}
if (!throwExc) return null;
throw sneakyThrow(new EVCacheException("Exception getting data for APP " + _appName + ", key = " + evcKey, ex));
}
}).doAfterTerminate(() -> {
final long duration = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime()- start;
getTimer(Call.GET_AND_TOUCH.name(), EVCacheMetricsFactory.READ, null, EVCacheMetricsFactory.SUCCESS, 1, maxReadDuration.get().intValue(), client.getServerGroup()).record(duration, TimeUnit.MILLISECONDS);
if (log.isDebugEnabled() && shouldLog()) log.debug("GET : APP " + _appName + ", Took " + duration + " milliSec.");
});
}