in evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java [610:655]
private <T> OperationFuture<Boolean> asyncStore(final StoreType storeType, final String key, int exp, T value, Transcoder<T> tc, EVCacheLatch evcacheLatch) {
final CachedData co;
if (value instanceof CachedData) {
co = (CachedData) value;
} else {
co = tc.encode(value);
}
final CountDownLatch latch = new CountDownLatch(1);
final String operationStr;
if (storeType == StoreType.set) {
operationStr = EVCacheMetricsFactory.SET_OPERATION;
} else if (storeType == StoreType.add) {
operationStr = EVCacheMetricsFactory.ADD_OPERATION;
} else {
operationStr = EVCacheMetricsFactory.REPLACE_OPERATION;
}
if(co != null && co.getData() != null) getDataSizeDistributionSummary(operationStr, EVCacheMetricsFactory.WRITE, EVCacheMetricsFactory.IPC_SIZE_OUTBOUND).record(co.getData().length);
final EVCacheOperationFuture<Boolean> rv = new EVCacheOperationFuture<Boolean>(key, latch, new AtomicReference<Boolean>(null), operationTimeout, executorService, client);
final Operation op = opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() {
@Override
public void receivedStatus(OperationStatus val) {
if (log.isDebugEnabled()) log.debug("Storing Key : " + key + "; Status : " + val.getStatusCode().name() + (log.isTraceEnabled() ? " Node : " + getEVCacheNode(key) : "") + "; Message : " + val.getMessage()
+ "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
rv.set(val.isSuccess(), val);
if (log.isTraceEnabled() && !val.getStatusCode().equals(StatusCode.SUCCESS)) log.trace(val.getStatusCode().name() + " storing Key : " + key , new Exception());
}
@Override
public void gotData(String key, long cas) {
rv.setCas(cas);
}
@Override
public void complete() {
latch.countDown();
final String host = (((rv.getStatus().getStatusCode().equals(StatusCode.TIMEDOUT) || rv.getStatus().getStatusCode().equals(StatusCode.ERR_NO_MEM)) && rv.getOperation() != null) ? getHostName(rv.getOperation().getHandlingNode().getSocketAddress()) : null);
getTimer(operationStr, EVCacheMetricsFactory.WRITE, rv.getStatus(), null, host, getWriteMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
rv.signalComplete();
}
});
rv.setOperation(op);
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !client.isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
mconn.enqueueOperation(key, op);
return rv;
}