public EVCacheOperationFuture asyncGet()

in evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java [143:245]


    public <T> EVCacheOperationFuture<T> asyncGet(final String key, final Transcoder<T> tc, EVCacheGetOperationListener<T> listener) {
        // we should only complete the latch when decode AND complete have completed
        final CountDownLatch latch = new CountDownLatch(2);

        final EVCacheOperationFuture<T> rv = new EVCacheOperationFuture<>(
                key, latch, new AtomicReference<T>(null), readTimeout.get(), executorService, client);

        final DistributionSummary dataSizeDS = getDataSizeDistributionSummary(
                EVCacheMetricsFactory.GET_OPERATION,
                EVCacheMetricsFactory.READ,
                EVCacheMetricsFactory.IPC_SIZE_INBOUND);

        @SuppressWarnings("unchecked")
        final Transcoder<T> transcoder = (tc == null) ? (Transcoder<T>) getTranscoder() : tc;
        final boolean shouldLog = log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName);

        final Operation op = opFact.get(key, new GetOperation.Callback() {
            // not volatile since only ever used from memcached loop callbacks
            private boolean asyncDecodeIssued = false;

            // both volatile to ensure sync across transcode threads and memcached loop
            private volatile T value;
            private volatile OperationStatus status = null;

            public void gotData(String k, int flags, byte[] data) {
                if (isWrongKeyReturned(key, k)) {
                    return;
                }

                if (shouldLog) {
                    log.debug("Read data : key {}; flags : {}; data : {}", key, flags, data);
                    if (data != null) {
                        log.debug("Key : {}; val size : {}", key, data.length);
                    } else {
                        log.debug("Key : {}; val is null", key);
                    }
                }

                if (data != null)  {
                    dataSizeDS.record(data.length);

                    if (tcService == null) {
                        log.error("tcService is null, will not be able to decode");
                        throw new RuntimeException("TranscoderSevice is null. Not able to decode");
                    }

                    CachedData chunk = new CachedData(flags, data, transcoder.getMaxSize());
                    boolean doSync = alwaysDecodeSync || (!transcoder.asyncDecode(chunk));

                    if (doSync) {
                        value = transcoder.decode(chunk);
                        rv.set(value, status);
                    } else {
                        asyncDecodeIssued = true;
                        final Transcoder<T> wrappedTranscoder = decodeAndThen(transcoder, (decoded) -> {
                            value = decoded;
                            rv.set(decoded, status);
                            latch.countDown();
                        });
                        tcService.decode(wrappedTranscoder, chunk);
                    }
                }
            }

            public void receivedStatus(OperationStatus status) {
                this.status = status;

                // On rare occasion, it might be possible that transcoder finishes and starts to call rv.set(),
                // at the exact time that receivedStatus here does a set(). This means that through unlucky timing
                // here we might drop the decoded value that was set by the transcoder.
                //
                // We add a simple if check to see if the transcode thread has changed the value after we did rv.set(),
                // and if it has, we will do it again. Since value is only set once (after decode), we need only a single
                // check here. It is important that it is a separate volatile read from value.

                T before = value;
                rv.set(before, status);
                T after = value;

                if (after != before) {
                    rv.set(after, status);
                }
            }

            public void complete() {
                // if an async decode was never issued, issue an extra countdown, since 2 latch values were set
                if (!asyncDecodeIssued) {
                    latch.countDown();
                }

                latch.countDown();

                final String metricHit = (asyncDecodeIssued || value != null) ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO;
                final String host = ((rv.getStatus().getStatusCode().equals(StatusCode.TIMEDOUT) && rv.getOperation() != null) ? getHostName(rv.getOperation().getHandlingNode().getSocketAddress()) : null);
                getTimer(EVCacheMetricsFactory.GET_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), metricHit, host, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
                rv.signalComplete();
            }
        });
        rv.setOperation(op);
        if (listener != null) rv.addListener(listener);
        mconn.enqueueOperation(key, op);
        return rv;
    }