in evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java [284:373]
public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
final Transcoder<T> tc,
EVCacheGetOperationListener<T> listener,
BiPredicate<MemcachedNode, String> nodeValidator) {
final Map<String, Future<T>> m = new ConcurrentHashMap<String, Future<T>>();
// Break the gets down into groups by key
final Map<MemcachedNode, Collection<String>> chunks = new HashMap<MemcachedNode, Collection<String>>();
final NodeLocator locator = mconn.getLocator();
//Populate Node and key Map
for (String key : keys) {
EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory);
final MemcachedNode primaryNode = locator.getPrimary(key);
if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) {
Collection<String> ks = chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>());
ks.add(key);
}
}
final AtomicInteger pendingChunks = new AtomicInteger(chunks.size());
int initialLatchCount = chunks.isEmpty() ? 0 : 1;
final CountDownLatch latch = new CountDownLatch(initialLatchCount);
final Collection<Operation> ops = new ArrayList<Operation>(chunks.size());
final EVCacheBulkGetFuture<T> rv = new EVCacheBulkGetFuture<T>(m, ops, latch, executorService, client);
rv.setExpectedCount(chunks.size());
final DistributionSummary dataSizeDS = getDataSizeDistributionSummary(
EVCacheMetricsFactory.BULK_OPERATION,
EVCacheMetricsFactory.READ,
EVCacheMetricsFactory.IPC_SIZE_INBOUND);
class EVCacheBulkGetSingleFutureCallback implements GetOperation.Callback {
final int thisOpId;
GetOperation op = null;
public EVCacheBulkGetSingleFutureCallback(int thisOpId) {
this.thisOpId = thisOpId;
}
void bindOp(GetOperation op) {
assert this.op == null;
assert op != null;
this.op = op;
}
@Override
public void receivedStatus(OperationStatus status) {
if (log.isDebugEnabled()) log.debug("GetBulk Keys : " + keys + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
rv.setStatus(status);
}
@Override
public void gotData(String k, int flags, byte[] data) {
if (data != null) {
dataSizeDS.record(data.length);
}
m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())));
}
@Override
public void complete() {
assert op != null;
rv.signalSingleOpComplete(thisOpId, op);
if (pendingChunks.decrementAndGet() <= 0) {
latch.countDown();
getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == keys.size() ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
rv.signalComplete();
}
}
};
// Now that we know how many servers it breaks down into, and the latch
// is all set up, convert all of these strings collections to operations
final Map<MemcachedNode, Operation> mops = new HashMap<MemcachedNode, Operation>();
int thisOpId = 0;
for (Map.Entry<MemcachedNode, Collection<String>> me : chunks.entrySet()) {
EVCacheBulkGetSingleFutureCallback cb = new EVCacheBulkGetSingleFutureCallback(thisOpId);
GetOperation op = opFact.get(me.getValue(), cb);
cb.bindOp(op);
mops.put(me.getKey(), op);
ops.add(op);
thisOpId++;
}
assert mops.size() == chunks.size();
mconn.checkState();
mconn.addOperations(mops);
return rv;
}