private void simulateMapperTask()

in src/main/java/com/uber/rss/tools/StreamServerStressTool.java [552:704]


    private void simulateMapperTask(List<byte[]> testValues,
                                    AppMapId appMapId,
                                    long taskAttemptId,
                                    boolean isLastTaskAttempt,
                                    boolean simulateEmptyData,
                                    RateCounter rateCounter,
                                    ConcurrentHashMap<Integer, AtomicLong> numPartitionRecords) {
        if (mapDelay > 0) {
            int delayMillis = random.nextInt(mapDelay);
            logger.info(String.format("Delaying map %s: %s", appMapId, delayMillis));
            try {
                Thread.sleep(delayMillis);
            } catch (InterruptedException e) {
                M3Stats.addException(e, M3Stats.TAG_VALUE_STRESS_TOOL);
                throw new RuntimeException(e);
            }
        }

        ShuffleWriteConfig shuffleWriteConfig = new ShuffleWriteConfig((short)numSplits);

        MultiServerWriteClient writeClient;
        int networkTimeoutMillis = 120 * 1000;
        long maxTryingMillis = networkTimeoutMillis * 3;
        List<ServerReplicationGroup> serverReplicationGroups = ServerReplicationGroupUtil.createReplicationGroups(serverDetails, numReplicas);
        boolean finishUploadAck = true; // TODO make this configurable
        if (writeClientQueueSize == 0) {
            // Use sync write client (MultiServerSyncWriteClient)
            writeClient = new MultiServerSyncWriteClient(serverReplicationGroups, partitionFanout, networkTimeoutMillis, maxTryingMillis, null, finishUploadAck, useConnectionPool, "user1", appId, appAttempt, shuffleWriteConfig);
            writeClient.connect();
            writeClient.startUpload(new AppTaskAttemptId(appMapId, taskAttemptId), numMaps, numPartitions);
        } else {
            // Use async write client (MultiServerAsyncWriteClient)
            writeClient = new MultiServerAsyncWriteClient(serverReplicationGroups, partitionFanout, networkTimeoutMillis, maxTryingMillis, null, finishUploadAck, useConnectionPool, writeClientQueueSize, writeClientThreads, "user1", appId, appAttempt, shuffleWriteConfig);
            writeClient.connect();
            writeClient.startUpload(new AppTaskAttemptId(appMapId, taskAttemptId), numMaps, numPartitions);
        }

        logger.info(String.format("Map %s attempt %s started, write client: %s", appMapId, taskAttemptId, writeClient));

        if (!simulateEmptyData) {
            int partitionId = random.nextInt(numPartitions);
            writeClient.writeDataBlock(partitionId, null);

            totalShuffleWrittenBytes.addAndGet(SHUFFLE_RECORD_EXTRA_BYTES);
            totalShuffleWrittenRecords.incrementAndGet();

            if (isLastTaskAttempt) {
                successShuffleWrittenRecords.incrementAndGet();
                usedPartitionIds.putIfAbsent(partitionId, partitionId);
                numPartitionRecords.computeIfAbsent(partitionId, k -> new AtomicLong()).incrementAndGet();
            }

            writeClient.writeDataBlock(partitionId, ByteBuffer.wrap(new byte[0]));

            totalShuffleWrittenBytes.addAndGet(SHUFFLE_RECORD_EXTRA_BYTES);
            totalShuffleWrittenRecords.incrementAndGet();

            if (isLastTaskAttempt) {
                successShuffleWrittenRecords.incrementAndGet();
                usedPartitionIds.putIfAbsent(partitionId, partitionId);
                numPartitionRecords.computeIfAbsent(partitionId, k -> new AtomicLong()).incrementAndGet();
            }
            
            while (totalShuffleWrittenBytes.get() < numBytes) {
                long totalShuffleWrittenBytesOldValue = totalShuffleWrittenBytes.get();

                partitionId = random.nextInt(numPartitions);

                byte[] keyData = testValues.get(random.nextInt(testValues.size()));
                if (keyData != null) {
                    totalShuffleWrittenBytes.addAndGet(keyData.length);
                }

                byte[] valueData = testValues.get(random.nextInt(testValues.size()));
                if (valueData != null) {
                    totalShuffleWrittenBytes.addAndGet(valueData.length);
                }

                totalShuffleWrittenBytes.addAndGet(SHUFFLE_RECORD_EXTRA_BYTES);
                totalShuffleWrittenRecords.incrementAndGet();

                if (isLastTaskAttempt) {
                    successShuffleWrittenRecords.incrementAndGet();
                    usedPartitionIds.putIfAbsent(partitionId, partitionId);
                    numPartitionRecords.computeIfAbsent(partitionId, k -> new AtomicLong()).incrementAndGet();
                }

                writeClient.writeDataBlock(partitionId,
                    valueData == null ? null : ByteBuffer.wrap(valueData));

                if (mapSlowness > 0) {
                    try {
                        Thread.sleep(mapSlowness);
                    } catch (InterruptedException e) {
                        M3Stats.addException(e, M3Stats.TAG_VALUE_STRESS_TOOL);
                        throw new RuntimeException(e);
                    }
                }

                long hitPointToShutdownServers = numBytes/2;
                if (totalShuffleWrittenBytesOldValue < hitPointToShutdownServers && totalShuffleWrittenBytes.get() >= hitPointToShutdownServers) {
                    synchronized (servers) {
                        synchronized (serverIdsToShutdownDuringShuffleWrite) {
                            for (String serverId : serverIdsToShutdownDuringShuffleWrite) {
                                StreamServer server = servers.stream().filter(t -> t != null).filter(t -> t.getServerId().equals(serverId)).findFirst().get();
                                logger.info(String.format("Simulate bad server during shuffle write by shutting down server: %s", server));
                                shutdownServer(server);

                                int index = servers.indexOf(server);
                                servers.set(index, null);
                            }
                            serverIdsToShutdownDuringShuffleWrite.clear();
                        }
                    }
                }
            }
        }

        // TODO simulate broken map tasks without proper closing
        
        try {
            writeClient.finishUpload();

            long bytes = writeClient.getShuffleWriteBytes();
            Double rate = rateCounter.addValueAndGetRate(bytes);
            if (rate != null) {
                long mapUploadedBytes = rateCounter.getOverallValue();
                logger.info(String.format("Map %s uploaded bytes: %s, rate: %s mb/s", appMapId, mapUploadedBytes, rate*(1000.0/(1024.0*1024.0))));
            }

            try {
                logger.info(String.format("Closing write client: %s", writeClient));
                writeClient.close();
            } catch (Exception e) {
                M3Stats.addException(e, M3Stats.TAG_VALUE_STRESS_TOOL);
                throw new RuntimeException(e);
            }
        } catch (Throwable ex) {
            writeClient.close();
            M3Stats.addException(ex, M3Stats.TAG_VALUE_STRESS_TOOL);
            if (isLastTaskAttempt) {
                throw ex;
            } else {
                logger.debug(String.format("Got ignorable error from stale map task: %s", ExceptionUtils.getSimpleMessage(ex)));
            }
        }

        logger.info(String.format("Map %s attempt %s finished", appMapId, taskAttemptId));

        double overallBytesMb = rateCounter.getOverallValue()/(1024.0*1024.0);
        double overallRate = rateCounter.getOverallRate()*(1000.0/(1024.0*1024.0));
        logger.info(String.format("Map %s total uploaded bytes: %s mb, rate: %s mb/s", appMapId, overallBytesMb, overallRate));
    }