in src/main/scala/org/apache/spark/shuffle/rss/RssStressTool.scala [293:352]
private def simulateMapperTask(testValues: util.List[String], appMapId: AppMapId, taskAttemptId: Long, isLastTaskAttempt: Boolean): Unit = {
if (mapDelay > 0) {
val delayMillis = random.nextInt(mapDelay)
logInfo(s"Delaying map $appMapId: $delayMillis")
Thread.sleep(delayMillis)
}
val shuffleWriteConfig = new ShuffleWriteConfig(numSplits.toShort)
var writeClient: ShuffleDataWriter = null
val networkTimeoutMillis = 120 * 1000
val maxTryingMillis = networkTimeoutMillis * 3
val serverReplicationGroups = ServerReplicationGroupUtil.createReplicationGroups(serverDetails, numReplicas)
val finishUploadAck = true // TODO make this configurable
if (writeClientQueueSize == 0) {
val aClient = new MultiServerSyncWriteClient(serverReplicationGroups, partitionFanout, networkTimeoutMillis, maxTryingMillis, null, finishUploadAck, useConnectionPool, "user1", appId, appAttempt, shuffleWriteConfig)
aClient.connect()
writeClient = aClient
}
else {
val aClient = new MultiServerAsyncWriteClient(serverReplicationGroups, partitionFanout, networkTimeoutMillis, maxTryingMillis, null, finishUploadAck, useConnectionPool, writeClientQueueSize, writeClientThreads, "user1", appId, appAttempt, shuffleWriteConfig)
aClient.connect()
writeClient = aClient
}
val shuffleWriter = new RssShuffleWriter(
user = "user1",
rssServers = new ServerList(serverDetails),
writeClient = writeClient,
mapInfo = new AppTaskAttemptId(appMapId, taskAttemptId),
numMaps = numMaps,
serializer = new KryoSerializer(sparkConf),
compressionOptions = CompressionOptions(),
compression = Compression.COMPRESSION_CODEC_LZ4,
bufferOptions = BufferManagerOptions(writerBufferSize, 256 * 1024 * 1024, writerBufferSpill),
shuffleDependency = shuffleDependency,
stageMetrics = new ShuffleClientStageMetrics(new ShuffleClientStageMetricsKey("user1", "queue=1")),
taskMetrics = new TaskMetrics(),
conf = sparkConf
)
logInfo(s"Map $appMapId attempt $taskAttemptId started, write client: $writeClient")
val records = testValues.asScala.map(t => {
val index = random.nextInt(testValues.size())
(t, testValues.get(index))
})
shuffleWriter.write(records.iterator)
if (isLastTaskAttempt) {
successShuffleWrittenRecords.addAndGet(records.size)
}
val mapStatus = shuffleWriter.stop(true)
mapOutputTrackerMaster.registerMapOutput(appShuffleId.getShuffleId, appMapId.getMapId, mapStatus.get)
// TODO simulate broken map tasks without proper closing
logInfo(s"Map $appMapId attempt $taskAttemptId finished")
}