in evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java [484:564]
public <T> OperationFuture<Boolean> asyncAppendOrAdd(final String key, int exp, CachedData co, EVCacheLatch evcacheLatch) {
final CountDownLatch latch = new CountDownLatch(1);
if(co != null && co.getData() != null) getDataSizeDistributionSummary(EVCacheMetricsFactory.AOA_OPERATION, EVCacheMetricsFactory.WRITE, EVCacheMetricsFactory.IPC_SIZE_OUTBOUND).record(co.getData().length);
final EVCacheOperationFuture<Boolean> rv = new EVCacheOperationFuture<Boolean>(key, latch, new AtomicReference<Boolean>(null), operationTimeout, executorService, client);
final Operation opAppend = opFact.cat(ConcatenationType.append, 0, key, co.getData(), new OperationCallback() {
boolean appendSuccess = false;
@Override
public void receivedStatus(OperationStatus val) {
if (log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.debug("AddOrAppend Key (Append Operation): " + key + "; Status : " + val.getStatusCode().name()
+ "; Message : " + val.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
if (val.getStatusCode().equals(StatusCode.SUCCESS)) {
rv.set(Boolean.TRUE, val);
appendSuccess = true;
}
}
@Override
public void complete() {
if(appendSuccess) {
final String host = ((rv.getStatus().getStatusCode().equals(StatusCode.TIMEDOUT) && rv.getOperation() != null) ? getHostName(rv.getOperation().getHandlingNode().getSocketAddress()) : null);
getTimer(EVCacheMetricsFactory.AOA_OPERATION_APPEND, EVCacheMetricsFactory.WRITE, rv.getStatus(), EVCacheMetricsFactory.YES, host, getWriteMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);;
latch.countDown();
rv.signalComplete();
} else {
Operation opAdd = opFact.store(StoreType.add, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() {
@Override
public void receivedStatus(OperationStatus addStatus) {
if (log.isDebugEnabled() && client.getPool().getEVCacheClientPoolManager().shouldLog(appName)) log.debug("AddOrAppend Key (Add Operation): " + key + "; Status : " + addStatus.getStatusCode().name()
+ "; Message : " + addStatus.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
if(addStatus.isSuccess()) {
appendSuccess = true;
rv.set(addStatus.isSuccess(), addStatus);
} else {
Operation opReappend = opFact.cat(ConcatenationType.append, 0, key, co.getData(), new OperationCallback() {
public void receivedStatus(OperationStatus retryAppendStatus) {
if (retryAppendStatus.getStatusCode().equals(StatusCode.SUCCESS)) {
rv.set(Boolean.TRUE, retryAppendStatus);
if (log.isDebugEnabled()) log.debug("AddOrAppend Retry append Key (Append Operation): " + key + "; Status : " + retryAppendStatus.getStatusCode().name()
+ "; Message : " + retryAppendStatus.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
} else {
rv.set(Boolean.FALSE, retryAppendStatus);
}
}
public void complete() {
final String host = ((rv.getStatus().getStatusCode().equals(StatusCode.TIMEDOUT) && rv.getOperation() != null) ? getHostName(rv.getOperation().getHandlingNode().getSocketAddress()) : null);
getTimer(EVCacheMetricsFactory.AOA_OPERATION_REAPPEND, EVCacheMetricsFactory.WRITE, rv.getStatus(), EVCacheMetricsFactory.YES, host, getWriteMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
latch.countDown();
rv.signalComplete();
}
});
rv.setOperation(opReappend);
mconn.enqueueOperation(key, opReappend);
}
}
@Override
public void gotData(String key, long cas) {
rv.setCas(cas);
}
@Override
public void complete() {
if(appendSuccess) {
final String host = ((rv.getStatus().getStatusCode().equals(StatusCode.TIMEDOUT) && rv.getOperation() != null) ? getHostName(rv.getOperation().getHandlingNode().getSocketAddress()) : null);
getTimer(EVCacheMetricsFactory.AOA_OPERATION_ADD, EVCacheMetricsFactory.WRITE, rv.getStatus(), EVCacheMetricsFactory.YES, host, getWriteMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
latch.countDown();
rv.signalComplete();
}
}
});
rv.setOperation(opAdd);
mconn.enqueueOperation(key, opAdd);
}
}
});
rv.setOperation(opAppend);
mconn.enqueueOperation(key, opAppend);
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl && !client.isInWriteOnly()) ((EVCacheLatchImpl) evcacheLatch).addFuture(rv);
return rv;
}