in evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java [464:535]
private <T> Single<T> assembleChunks(String key, boolean touch, int ttl, Transcoder<T> tc, boolean hasZF, Scheduler scheduler) {
return getChunkDetails(key, scheduler).flatMap(cd -> {
if (cd == null) return Single.just(null);
if (!cd.isChunked()) {
if (cd.getData() == null) return Single.just(null);
final Transcoder<T> transcoder = (tc == null ? (Transcoder<T>) evcacheMemcachedClient.getTranscoder() : tc);
return Single.just(transcoder.decode((CachedData) cd.getData()));
} else {
final List<String> keys = cd.getChunkKeys();
final ChunkInfo ci = cd.getChunkInfo();
return evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null)
.getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler)
.map(dataMap -> {
if (dataMap.size() != ci.getChunks() - 1) {
incrementFailure(EVCacheMetricsFactory.INCORRECT_CHUNKS, null);
return null;
}
final byte[] data = new byte[(ci.getChunks() - 2) * ci.getChunkSize() + (ci.getLastChunk() == 0 ? ci
.getChunkSize() : ci.getLastChunk())];
int index = 0;
for (int i = 0; i < keys.size(); i++) {
final String _key = keys.get(i);
final CachedData _cd = dataMap.get(_key);
if (log.isDebugEnabled()) log.debug("Chunk Key " + _key + "; Value : " + _cd);
if (_cd == null) continue;
final byte[] val = _cd.getData();
// If we expect a chunk to be present and it is null then return null immediately.
if (val == null) return null;
final int len = (i == keys.size() - 1) ? ((ci.getLastChunk() == 0 || ci.getLastChunk() > ci
.getChunkSize()) ? ci.getChunkSize() : ci.getLastChunk())
: val.length;
if (len != ci.getChunkSize() && i != keys.size() - 1) {
incrementFailure(EVCacheMetricsFactory.INVALID_CHUNK_SIZE, null);
if (log.isWarnEnabled()) log.warn("CHUNK_SIZE_ERROR : Chunks : " + ci.getChunks() + " ; "
+ "length : " + len + "; expectedLength : " + ci.getChunkSize() + " for key : " + _key);
}
if (len > 0) {
try {
System.arraycopy(val, 0, data, index, len);
} catch (Exception e) {
StringBuilder sb = new StringBuilder();
sb.append("ArrayCopyError - Key : " + _key + "; final data Size : " + data.length
+ "; copy array size : " + len + "; val size : " + val.length
+ "; key index : " + i + "; copy from : " + index + "; ChunkInfo : " + ci + "\n");
for (int j = 0; j < keys.size(); j++) {
final String skey = keys.get(j);
final byte[] sval = (byte[]) dataMap.get(skey).getData();
sb.append(skey + "=" + sval.length + "\n");
}
if (log.isWarnEnabled()) log.warn(sb.toString(), e);
throw e;
}
System.arraycopy(val, 0, data, index, len);
index += val.length;
if (touch) evcacheMemcachedClient.touch(_key, ttl);
}
}
final boolean checksumPass = checkCRCChecksum(data, ci, hasZF);
if (!checksumPass) return null;
final Transcoder<T> transcoder = (tc == null ? (Transcoder<T>) evcacheMemcachedClient.getTranscoder()
: tc);
return transcoder.decode(new CachedData(ci.getFlags(), data, Integer.MAX_VALUE));
});
}
});
}