private Map getBulk()

in evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java [2073:2307]


    private <T> Map<String, T> getBulk(final Collection<String> keys, Transcoder<T> tc, boolean touch, int timeToLive) throws EVCacheException {
        if (null == keys) throw new IllegalArgumentException();
        if (keys.isEmpty()) return Collections.<String, T> emptyMap();
        checkTTL(timeToLive, Call.BULK);
        final boolean throwExc = doThrowException();
        final EVCacheClient client = _pool.getEVCacheClientForRead();
        if (client == null) {
            incrementFastFail(EVCacheMetricsFactory.NULL_CLIENT, Call.BULK);
            if (throwExc) throw new EVCacheException("Could not find a client to get the data in bulk");
            return Collections.<String, T> emptyMap();// Fast failure
        }

        final Map<String, T> decanonicalR = new HashMap<String, T>((keys.size() * 4) / 3 + 1);
        final Collection<EVCacheKey> evcKeys = new ArrayList<EVCacheKey>();
        /* Canonicalize keys and perform fast failure checking */
        for (String k : keys) {
            final EVCacheKey evcKey = getEVCacheKey(k);
            T value = null;
            if (_useInMemoryCache.get()) {
                try {
                    final Transcoder<T> transcoder = (tc == null) ? ((_transcoder == null) ? (Transcoder<T>) _pool.getEVCacheClientForRead().getTranscoder() : (Transcoder<T>) _transcoder) : tc;
                    value = (T) getInMemoryCache(transcoder).get(evcKey);
                    if(value == null)  if (log.isInfoEnabled() && shouldLog()) log.info("Value not_found in inmemory cache for APP " + _appName + ", key : " + evcKey + "; value : " + value );
                } catch (ExecutionException e) {
                    if (log.isDebugEnabled() && shouldLog()) log.debug("ExecutionException while getting data from InMemory Cache", e);
                    throw new EVCacheException("ExecutionException", e);
                }
            }
            if(value == null) {
                evcKeys.add(evcKey);
            } else {
                decanonicalR.put(evcKey.getKey(), value);
                if (log.isDebugEnabled() && shouldLog()) log.debug("Value retrieved from inmemory cache for APP " + _appName + ", key : " + evcKey + (log.isTraceEnabled() ? "; value : " + value : ""));
            }
        }

        if(evcKeys.size() == 0 && decanonicalR.size() == keys.size()) {
        	if (log.isDebugEnabled() && shouldLog()) log.debug("All Values retrieved from inmemory cache for APP " + _appName + ", keys : " + keys + (log.isTraceEnabled() ? "; value : " + decanonicalR : ""));
        	return decanonicalR;
        }

        final EVCacheEvent event = createEVCacheEvent(Collections.singletonList(client), Call.BULK);
        if (event != null) {
            event.setEVCacheKeys(evcKeys);
            try {
                if (shouldThrottle(event)) {
                    incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.BULK);
                    if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & keys " + keys);
                    return Collections.<String, T> emptyMap();
                }
            } catch(EVCacheException ex) {
                if(throwExc) throw ex;
                incrementFastFail(EVCacheMetricsFactory.THROTTLED, Call.BULK);
                return null;
            }
            event.setTTL(timeToLive);
            startEvent(event);
        }


        final long start = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime();
        String cacheOperation = EVCacheMetricsFactory.YES;
        int tries = 1;
        String status = EVCacheMetricsFactory.SUCCESS;
        try {
            final boolean hasZF = hasZoneFallbackForBulk();
            boolean throwEx = hasZF ? false : throwExc;
            Map<EVCacheKey, T> retMap = getBulkData(client, evcKeys, tc, throwEx, hasZF);
            List<EVCacheClient> fbClients = null;
            if (hasZF) {
                if (retMap == null || retMap.isEmpty()) {
                    fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());
                    if (fbClients != null && !fbClients.isEmpty()) {
                        for (int i = 0; i < fbClients.size(); i++) {
                            final EVCacheClient fbClient = fbClients.get(i);
                            if(i >= fbClients.size() - 1) throwEx = throwExc;
                            if (event != null) {
                                try {
                                    if (shouldThrottle(event)) {
                                        status = EVCacheMetricsFactory.THROTTLED;
                                        if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + evcKeys);
                                        return null;
                                    }
                                } catch(EVCacheException ex) {
                                    if(throwExc) throw ex;
                                    status = EVCacheMetricsFactory.THROTTLED;
                                    return null;
                                }
                            }
                            tries++;
                            retMap = getBulkData(fbClient, evcKeys, tc, throwEx, (i < fbClients.size() - 1) ? true : false);
                            if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + evcKeys + (log.isTraceEnabled() ? "], Value [" + retMap : "") + "], zone : " + fbClient.getZone());
                            if (retMap != null && !retMap.isEmpty()) break;
                        }
                        //increment("BULK-FULL_RETRY-" + ((retMap == null || retMap.isEmpty()) ? "MISS" : "HIT"));
                    }
                } else if (retMap != null && keys.size() > retMap.size() && _bulkPartialZoneFallbackFP.get()) {
                    final int initRetrySize = keys.size() - retMap.size();
                    List<EVCacheKey> retryEVCacheKeys = new ArrayList<EVCacheKey>(initRetrySize);
                    for (Iterator<EVCacheKey> keysItr = evcKeys.iterator(); keysItr.hasNext();) {
                        final EVCacheKey key = keysItr.next();
                        if (!retMap.containsKey(key)) {
                            retryEVCacheKeys.add(key);
                        }
                    }

                    fbClients = _pool.getEVCacheClientsForReadExcluding(client.getServerGroup());
                    if (fbClients != null && !fbClients.isEmpty()) {
                        for (int ind = 0; ind < fbClients.size(); ind++) {
                            final EVCacheClient fbClient = fbClients.get(ind);
                            if (event != null) {
                                try {
                                    if (shouldThrottle(event)) {
                                        status = EVCacheMetricsFactory.THROTTLED;
                                        if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & keys " + retryEVCacheKeys);
                                        return null;
                                    }
                                } catch(EVCacheException ex) {
                                    status = EVCacheMetricsFactory.THROTTLED;
                                    if(throwExc) throw ex;
                                    return null;
                                }
                            }
                            tries++;

                            final Map<EVCacheKey, T> fbRetMap = getBulkData(fbClient, retryEVCacheKeys, tc, false, hasZF);
                            if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + retryEVCacheKeys + "], Fallback Server Group : " + fbClient .getServerGroup().getName());
                            for (Map.Entry<EVCacheKey, T> i : fbRetMap.entrySet()) {
                                retMap.put(i.getKey(), i.getValue());
                                if (log.isDebugEnabled() && shouldLog()) log.debug("Fallback for APP " + _appName + ", key [" + i.getKey() + (log.isTraceEnabled() ? "], Value [" + i.getValue(): "]"));
                            }
                            if (retryEVCacheKeys.size() == fbRetMap.size()) break;
                            if (ind < fbClients.size()) {
                                retryEVCacheKeys = new ArrayList<EVCacheKey>(keys.size() - retMap.size());
                                for (Iterator<EVCacheKey> keysItr = evcKeys.iterator(); keysItr.hasNext();) {
                                    final EVCacheKey key = keysItr.next();
                                    if (!retMap.containsKey(key)) {
                                        retryEVCacheKeys.add(key);
                                    }
                                }
                            }
                        }
                    }
                    if (log.isDebugEnabled() && shouldLog() && retMap.size() == keys.size()) log.debug("Fallback SUCCESS for APP " + _appName + ",  retMap [" + retMap + "]");
                }
            }

            if(decanonicalR.isEmpty()) {
                if (retMap == null || retMap.isEmpty()) {
                    if (log.isInfoEnabled() && shouldLog()) log.info("BULK : APP " + _appName + " ; Full cache miss for keys : " + keys);
                    if (event != null) event.setAttribute("status", "BMISS_ALL");
                    final Map<String, T> returnMap = new HashMap<String, T>();
                    if (retMap != null && retMap.isEmpty()) {
                        for (String k : keys) {
                            returnMap.put(k, null);
                        }
                    }
                    //increment("BulkMissFull");
                    cacheOperation = EVCacheMetricsFactory.NO;
                    /* If both Retry and first request fail Exit Immediately. */
                    if (event != null) endEvent(event);
                    return returnMap;
                }
            }

            /* Decanonicalize the keys */
            boolean partialHit = false;
            final List<String> decanonicalHitKeys = new ArrayList<String>(retMap.size());
            for (Iterator<EVCacheKey> itr = evcKeys.iterator(); itr.hasNext();) {
                final EVCacheKey key = itr.next();
                final String deCanKey = key.getKey();
                final T value = retMap.get(key);
                if (value != null) {
                    decanonicalR.put(deCanKey, value);
                    if (touch) touchData(key, timeToLive);
                    decanonicalHitKeys.add(deCanKey);
                } else {
                    partialHit = true;
                    // this ensures the fallback was tried
                    decanonicalR.put(deCanKey, null);
                }
            }
            if (!decanonicalR.isEmpty()) {
                if (!partialHit) {
                    if (event != null) event.setAttribute("status", "BHIT");
                } else {
                    if (event != null) {
                        event.setAttribute("status", "BHIT_PARTIAL");
                        event.setAttribute("BHIT_PARTIAL_KEYS", decanonicalHitKeys);
                    }
                    //increment("BulkHitPartial");
                    cacheOperation = EVCacheMetricsFactory.PARTIAL;
                    if (log.isInfoEnabled() && shouldLog()) log.info("BULK_HIT_PARTIAL for APP " + _appName + ", keys in cache [" + decanonicalR + "], all keys [" + keys + "]");
                }
            }

            if (log.isDebugEnabled() && shouldLog()) log.debug("BulkGet; APP " + _appName + ", keys : " + keys + (log.isTraceEnabled() ? "; value : " + decanonicalR : ""));
            if (event != null) endEvent(event);
            return decanonicalR;
        } catch (net.spy.memcached.internal.CheckedOperationTimeoutException ex) {
            status = EVCacheMetricsFactory.TIMEOUT;
            if (log.isDebugEnabled() && shouldLog()) log.debug("CheckedOperationTimeoutException getting bulk data for APP " + _appName + ", keys : " + evcKeys, ex);
            if (event != null) {
                event.setStatus(status);
                eventError(event, ex);
            }
            if (!throwExc) return null;
            throw new EVCacheException("CheckedOperationTimeoutException getting bulk data for APP " + _appName + ", keys = " + evcKeys
                    + ".\nYou can set the following property to increase the timeout " + _appName + ".EVCacheClientPool.bulkReadTimeout=<timeout in milli-seconds>", ex);
        } catch (Exception ex) {
            status = EVCacheMetricsFactory.ERROR;
            if (log.isDebugEnabled() && shouldLog()) log.debug("Exception getting bulk data for APP " + _appName + ", keys = " + evcKeys, ex);
            if (event != null) {
                event.setStatus(status);
                eventError(event, ex);
            }
            if (!throwExc) return null;
            throw new EVCacheException("Exception getting bulk data for APP " + _appName + ", keys = " + evcKeys, ex);
        } finally {
            final long duration = EVCacheMetricsFactory.getInstance().getRegistry().clock().wallTime()- start;

            if(bulkKeysSize == null) {
                final List<Tag> tagList = new ArrayList<Tag>(4);
                tagList.addAll(tags);
                tagList.add(new BasicTag(EVCacheMetricsFactory.CALL_TAG, EVCacheMetricsFactory.BULK_OPERATION));
                tagList.add(new BasicTag(EVCacheMetricsFactory.CALL_TYPE_TAG, EVCacheMetricsFactory.READ));
//                if(status != null) tagList.add(new BasicTag(EVCacheMetricsFactory.STATUS, status));
//                if(tries >= 0) tagList.add(new BasicTag(EVCacheMetricsFactory.ATTEMPT, String.valueOf(tries)));
                bulkKeysSize = EVCacheMetricsFactory.getInstance().getDistributionSummary(EVCacheMetricsFactory.OVERALL_KEYS_SIZE, tagList);
            }
            bulkKeysSize.record(keys.size());
            getTimer(Call.BULK.name(), EVCacheMetricsFactory.READ, cacheOperation, status, tries, maxReadDuration.get().intValue(), client.getServerGroup()).record(duration, TimeUnit.MILLISECONDS);
            if (log.isDebugEnabled() && shouldLog()) log.debug("BULK : APP " + _appName + " Took " + duration + " milliSec to get the value for key " + evcKeys);
        }
    }