public EVCacheBulkGetFuture asyncGetBulk()

in evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java [284:373]


    public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
                                                    final Transcoder<T> tc,
                                                    EVCacheGetOperationListener<T> listener,
                                                    BiPredicate<MemcachedNode, String> nodeValidator) {
        final Map<String, Future<T>> m = new ConcurrentHashMap<String, Future<T>>();

        // Break the gets down into groups by key
        final Map<MemcachedNode, Collection<String>> chunks = new HashMap<MemcachedNode, Collection<String>>();
        final NodeLocator locator = mconn.getLocator();

        //Populate Node and key Map
        for (String key : keys) {
            EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory);
            final MemcachedNode primaryNode = locator.getPrimary(key);
            if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) {
                Collection<String> ks = chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>());
                ks.add(key);
            }
        }

        final AtomicInteger pendingChunks = new AtomicInteger(chunks.size());
        int initialLatchCount = chunks.isEmpty() ? 0 : 1;
        final CountDownLatch latch = new CountDownLatch(initialLatchCount);
        final Collection<Operation> ops = new ArrayList<Operation>(chunks.size());
        final EVCacheBulkGetFuture<T> rv = new EVCacheBulkGetFuture<T>(m, ops, latch, executorService, client);
        rv.setExpectedCount(chunks.size());

        final DistributionSummary dataSizeDS = getDataSizeDistributionSummary(
                        EVCacheMetricsFactory.BULK_OPERATION,
                        EVCacheMetricsFactory.READ,
                        EVCacheMetricsFactory.IPC_SIZE_INBOUND);

        class EVCacheBulkGetSingleFutureCallback implements GetOperation.Callback {
            final int thisOpId;
            GetOperation op = null;
            public EVCacheBulkGetSingleFutureCallback(int thisOpId) {
                this.thisOpId = thisOpId;
            }

            void bindOp(GetOperation op) {
                assert this.op == null;
                assert op != null;

                this.op = op;
            }

            @Override
            public void receivedStatus(OperationStatus status) {
                if (log.isDebugEnabled()) log.debug("GetBulk Keys : " + keys + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
                rv.setStatus(status);
            }

            @Override
            public void gotData(String k, int flags, byte[] data) {
                if (data != null)  {
                    dataSizeDS.record(data.length);
                }
                m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())));
            }

            @Override
            public void complete() {
                assert op != null;

                rv.signalSingleOpComplete(thisOpId, op);
                if (pendingChunks.decrementAndGet() <= 0) {
                    latch.countDown();
                    getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == keys.size() ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
                    rv.signalComplete();
                }
            }
        };

        // Now that we know how many servers it breaks down into, and the latch
        // is all set up, convert all of these strings collections to operations
        final Map<MemcachedNode, Operation> mops = new HashMap<MemcachedNode, Operation>();
        int thisOpId = 0;
        for (Map.Entry<MemcachedNode, Collection<String>> me : chunks.entrySet()) {
            EVCacheBulkGetSingleFutureCallback cb = new EVCacheBulkGetSingleFutureCallback(thisOpId);
            GetOperation op = opFact.get(me.getValue(), cb);
            cb.bindOp(op);
            mops.put(me.getKey(), op);
            ops.add(op);
            thisOpId++;
        }
        assert mops.size() == chunks.size();
        mconn.checkState();
        mconn.addOperations(mops);
        return rv;
    }