private Single assembleChunks()

in evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java [464:535]


    private <T> Single<T> assembleChunks(String key, boolean touch, int ttl, Transcoder<T> tc, boolean hasZF, Scheduler scheduler) {
        return getChunkDetails(key, scheduler).flatMap(cd -> {
            if (cd == null) return Single.just(null);
            if (!cd.isChunked()) {
                if (cd.getData() == null) return Single.just(null);
                final Transcoder<T> transcoder = (tc == null ? (Transcoder<T>) evcacheMemcachedClient.getTranscoder() : tc);
                return Single.just(transcoder.decode((CachedData) cd.getData()));
            } else {
                final List<String> keys = cd.getChunkKeys();
                final ChunkInfo ci = cd.getChunkInfo();

                return evcacheMemcachedClient.asyncGetBulk(keys, chunkingTranscoder, null)
                    .getSome(readTimeout.get(), TimeUnit.MILLISECONDS, false, false, scheduler)
                    .map(dataMap -> {
                        if (dataMap.size() != ci.getChunks() - 1) {
                            incrementFailure(EVCacheMetricsFactory.INCORRECT_CHUNKS, null);
                            return null;
                        }

                        final byte[] data = new byte[(ci.getChunks() - 2) * ci.getChunkSize() + (ci.getLastChunk() == 0 ? ci
                            .getChunkSize() : ci.getLastChunk())];
                        int index = 0;
                        for (int i = 0; i < keys.size(); i++) {
                            final String _key = keys.get(i);
                            final CachedData _cd = dataMap.get(_key);
                            if (log.isDebugEnabled()) log.debug("Chunk Key " + _key + "; Value : " + _cd);
                            if (_cd == null) continue;

                            final byte[] val = _cd.getData();

                            // If we expect a chunk to be present and it is null then return null immediately.
                            if (val == null) return null;
                            final int len = (i == keys.size() - 1) ? ((ci.getLastChunk() == 0 || ci.getLastChunk() > ci
                                .getChunkSize()) ? ci.getChunkSize() : ci.getLastChunk())
                                : val.length;
                            if (len != ci.getChunkSize() && i != keys.size() - 1) {
                                incrementFailure(EVCacheMetricsFactory.INVALID_CHUNK_SIZE, null);
                                if (log.isWarnEnabled()) log.warn("CHUNK_SIZE_ERROR : Chunks : " + ci.getChunks() + " ; "
                                    + "length : " + len + "; expectedLength : " + ci.getChunkSize() + " for key : " + _key);
                            }
                            if (len > 0) {
                                try {
                                    System.arraycopy(val, 0, data, index, len);
                                } catch (Exception e) {
                                    StringBuilder sb = new StringBuilder();
                                    sb.append("ArrayCopyError - Key : " + _key + "; final data Size : " + data.length
                                        + "; copy array size : " + len + "; val size : " + val.length
                                        + "; key index : " + i + "; copy from : " + index + "; ChunkInfo : " + ci + "\n");
                                    for (int j = 0; j < keys.size(); j++) {
                                        final String skey = keys.get(j);
                                        final byte[] sval = (byte[]) dataMap.get(skey).getData();
                                        sb.append(skey + "=" + sval.length + "\n");
                                    }
                                    if (log.isWarnEnabled()) log.warn(sb.toString(), e);
                                    throw e;
                                }

                                System.arraycopy(val, 0, data, index, len);
                                index += val.length;
                                if (touch) evcacheMemcachedClient.touch(_key, ttl);
                            }
                        }

                        final boolean checksumPass = checkCRCChecksum(data, ci, hasZF);
                        if (!checksumPass) return null;
                        final Transcoder<T> transcoder = (tc == null ? (Transcoder<T>) evcacheMemcachedClient.getTranscoder()
                            : tc);
                        return transcoder.decode(new CachedData(ci.getFlags(), data, Integer.MAX_VALUE));
                    });
            }
        });
    }