in src/main/java/com/uber/rss/tools/StreamServerStressTool.java [552:704]
private void simulateMapperTask(List<byte[]> testValues,
AppMapId appMapId,
long taskAttemptId,
boolean isLastTaskAttempt,
boolean simulateEmptyData,
RateCounter rateCounter,
ConcurrentHashMap<Integer, AtomicLong> numPartitionRecords) {
if (mapDelay > 0) {
int delayMillis = random.nextInt(mapDelay);
logger.info(String.format("Delaying map %s: %s", appMapId, delayMillis));
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
M3Stats.addException(e, M3Stats.TAG_VALUE_STRESS_TOOL);
throw new RuntimeException(e);
}
}
ShuffleWriteConfig shuffleWriteConfig = new ShuffleWriteConfig((short)numSplits);
MultiServerWriteClient writeClient;
int networkTimeoutMillis = 120 * 1000;
long maxTryingMillis = networkTimeoutMillis * 3;
List<ServerReplicationGroup> serverReplicationGroups = ServerReplicationGroupUtil.createReplicationGroups(serverDetails, numReplicas);
boolean finishUploadAck = true; // TODO make this configurable
if (writeClientQueueSize == 0) {
// Use sync write client (MultiServerSyncWriteClient)
writeClient = new MultiServerSyncWriteClient(serverReplicationGroups, partitionFanout, networkTimeoutMillis, maxTryingMillis, null, finishUploadAck, useConnectionPool, "user1", appId, appAttempt, shuffleWriteConfig);
writeClient.connect();
writeClient.startUpload(new AppTaskAttemptId(appMapId, taskAttemptId), numMaps, numPartitions);
} else {
// Use async write client (MultiServerAsyncWriteClient)
writeClient = new MultiServerAsyncWriteClient(serverReplicationGroups, partitionFanout, networkTimeoutMillis, maxTryingMillis, null, finishUploadAck, useConnectionPool, writeClientQueueSize, writeClientThreads, "user1", appId, appAttempt, shuffleWriteConfig);
writeClient.connect();
writeClient.startUpload(new AppTaskAttemptId(appMapId, taskAttemptId), numMaps, numPartitions);
}
logger.info(String.format("Map %s attempt %s started, write client: %s", appMapId, taskAttemptId, writeClient));
if (!simulateEmptyData) {
int partitionId = random.nextInt(numPartitions);
writeClient.writeDataBlock(partitionId, null);
totalShuffleWrittenBytes.addAndGet(SHUFFLE_RECORD_EXTRA_BYTES);
totalShuffleWrittenRecords.incrementAndGet();
if (isLastTaskAttempt) {
successShuffleWrittenRecords.incrementAndGet();
usedPartitionIds.putIfAbsent(partitionId, partitionId);
numPartitionRecords.computeIfAbsent(partitionId, k -> new AtomicLong()).incrementAndGet();
}
writeClient.writeDataBlock(partitionId, ByteBuffer.wrap(new byte[0]));
totalShuffleWrittenBytes.addAndGet(SHUFFLE_RECORD_EXTRA_BYTES);
totalShuffleWrittenRecords.incrementAndGet();
if (isLastTaskAttempt) {
successShuffleWrittenRecords.incrementAndGet();
usedPartitionIds.putIfAbsent(partitionId, partitionId);
numPartitionRecords.computeIfAbsent(partitionId, k -> new AtomicLong()).incrementAndGet();
}
while (totalShuffleWrittenBytes.get() < numBytes) {
long totalShuffleWrittenBytesOldValue = totalShuffleWrittenBytes.get();
partitionId = random.nextInt(numPartitions);
byte[] keyData = testValues.get(random.nextInt(testValues.size()));
if (keyData != null) {
totalShuffleWrittenBytes.addAndGet(keyData.length);
}
byte[] valueData = testValues.get(random.nextInt(testValues.size()));
if (valueData != null) {
totalShuffleWrittenBytes.addAndGet(valueData.length);
}
totalShuffleWrittenBytes.addAndGet(SHUFFLE_RECORD_EXTRA_BYTES);
totalShuffleWrittenRecords.incrementAndGet();
if (isLastTaskAttempt) {
successShuffleWrittenRecords.incrementAndGet();
usedPartitionIds.putIfAbsent(partitionId, partitionId);
numPartitionRecords.computeIfAbsent(partitionId, k -> new AtomicLong()).incrementAndGet();
}
writeClient.writeDataBlock(partitionId,
valueData == null ? null : ByteBuffer.wrap(valueData));
if (mapSlowness > 0) {
try {
Thread.sleep(mapSlowness);
} catch (InterruptedException e) {
M3Stats.addException(e, M3Stats.TAG_VALUE_STRESS_TOOL);
throw new RuntimeException(e);
}
}
long hitPointToShutdownServers = numBytes/2;
if (totalShuffleWrittenBytesOldValue < hitPointToShutdownServers && totalShuffleWrittenBytes.get() >= hitPointToShutdownServers) {
synchronized (servers) {
synchronized (serverIdsToShutdownDuringShuffleWrite) {
for (String serverId : serverIdsToShutdownDuringShuffleWrite) {
StreamServer server = servers.stream().filter(t -> t != null).filter(t -> t.getServerId().equals(serverId)).findFirst().get();
logger.info(String.format("Simulate bad server during shuffle write by shutting down server: %s", server));
shutdownServer(server);
int index = servers.indexOf(server);
servers.set(index, null);
}
serverIdsToShutdownDuringShuffleWrite.clear();
}
}
}
}
}
// TODO simulate broken map tasks without proper closing
try {
writeClient.finishUpload();
long bytes = writeClient.getShuffleWriteBytes();
Double rate = rateCounter.addValueAndGetRate(bytes);
if (rate != null) {
long mapUploadedBytes = rateCounter.getOverallValue();
logger.info(String.format("Map %s uploaded bytes: %s, rate: %s mb/s", appMapId, mapUploadedBytes, rate*(1000.0/(1024.0*1024.0))));
}
try {
logger.info(String.format("Closing write client: %s", writeClient));
writeClient.close();
} catch (Exception e) {
M3Stats.addException(e, M3Stats.TAG_VALUE_STRESS_TOOL);
throw new RuntimeException(e);
}
} catch (Throwable ex) {
writeClient.close();
M3Stats.addException(ex, M3Stats.TAG_VALUE_STRESS_TOOL);
if (isLastTaskAttempt) {
throw ex;
} else {
logger.debug(String.format("Got ignorable error from stale map task: %s", ExceptionUtils.getSimpleMessage(ex)));
}
}
logger.info(String.format("Map %s attempt %s finished", appMapId, taskAttemptId));
double overallBytesMb = rateCounter.getOverallValue()/(1024.0*1024.0);
double overallRate = rateCounter.getOverallRate()*(1000.0/(1024.0*1024.0));
logger.info(String.format("Map %s total uploaded bytes: %s mb, rate: %s mb/s", appMapId, overallBytesMb, overallRate));
}