in evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java [143:245]
public <T> EVCacheOperationFuture<T> asyncGet(final String key, final Transcoder<T> tc, EVCacheGetOperationListener<T> listener) {
// we should only complete the latch when decode AND complete have completed
final CountDownLatch latch = new CountDownLatch(2);
final EVCacheOperationFuture<T> rv = new EVCacheOperationFuture<>(
key, latch, new AtomicReference<T>(null), readTimeout.get(), executorService, client);
final DistributionSummary dataSizeDS = getDataSizeDistributionSummary(
EVCacheMetricsFactory.GET_OPERATION,
EVCacheMetricsFactory.READ,
EVCacheMetricsFactory.IPC_SIZE_INBOUND);
@SuppressWarnings("unchecked")
final Transcoder<T> transcoder = (tc == null) ? (Transcoder<T>) getTranscoder() : tc;
final boolean shouldLog = log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName);
final Operation op = opFact.get(key, new GetOperation.Callback() {
// not volatile since only ever used from memcached loop callbacks
private boolean asyncDecodeIssued = false;
// both volatile to ensure sync across transcode threads and memcached loop
private volatile T value;
private volatile OperationStatus status = null;
public void gotData(String k, int flags, byte[] data) {
if (isWrongKeyReturned(key, k)) {
return;
}
if (shouldLog) {
log.debug("Read data : key {}; flags : {}; data : {}", key, flags, data);
if (data != null) {
log.debug("Key : {}; val size : {}", key, data.length);
} else {
log.debug("Key : {}; val is null", key);
}
}
if (data != null) {
dataSizeDS.record(data.length);
if (tcService == null) {
log.error("tcService is null, will not be able to decode");
throw new RuntimeException("TranscoderSevice is null. Not able to decode");
}
CachedData chunk = new CachedData(flags, data, transcoder.getMaxSize());
boolean doSync = alwaysDecodeSync || (!transcoder.asyncDecode(chunk));
if (doSync) {
value = transcoder.decode(chunk);
rv.set(value, status);
} else {
asyncDecodeIssued = true;
final Transcoder<T> wrappedTranscoder = decodeAndThen(transcoder, (decoded) -> {
value = decoded;
rv.set(decoded, status);
latch.countDown();
});
tcService.decode(wrappedTranscoder, chunk);
}
}
}
public void receivedStatus(OperationStatus status) {
this.status = status;
// On rare occasion, it might be possible that transcoder finishes and starts to call rv.set(),
// at the exact time that receivedStatus here does a set(). This means that through unlucky timing
// here we might drop the decoded value that was set by the transcoder.
//
// We add a simple if check to see if the transcode thread has changed the value after we did rv.set(),
// and if it has, we will do it again. Since value is only set once (after decode), we need only a single
// check here. It is important that it is a separate volatile read from value.
T before = value;
rv.set(before, status);
T after = value;
if (after != before) {
rv.set(after, status);
}
}
public void complete() {
// if an async decode was never issued, issue an extra countdown, since 2 latch values were set
if (!asyncDecodeIssued) {
latch.countDown();
}
latch.countDown();
final String metricHit = (asyncDecodeIssued || value != null) ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO;
final String host = ((rv.getStatus().getStatusCode().equals(StatusCode.TIMEDOUT) && rv.getOperation() != null) ? getHostName(rv.getOperation().getHandlingNode().getSocketAddress()) : null);
getTimer(EVCacheMetricsFactory.GET_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), metricHit, host, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
rv.signalComplete();
}
});
rv.setOperation(op);
if (listener != null) rv.addListener(listener);
mconn.enqueueOperation(key, op);
return rv;
}