private def simulateMapperTask()

in src/main/scala/org/apache/spark/shuffle/rss/RssStressTool.scala [293:352]


  private def simulateMapperTask(testValues: util.List[String], appMapId: AppMapId, taskAttemptId: Long, isLastTaskAttempt: Boolean): Unit = {
    if (mapDelay > 0) {
      val delayMillis = random.nextInt(mapDelay)
      logInfo(s"Delaying map $appMapId: $delayMillis")
      Thread.sleep(delayMillis)
    }

    val shuffleWriteConfig = new ShuffleWriteConfig(numSplits.toShort)

    var writeClient: ShuffleDataWriter = null
    val networkTimeoutMillis = 120 * 1000
    val maxTryingMillis = networkTimeoutMillis * 3
    val serverReplicationGroups = ServerReplicationGroupUtil.createReplicationGroups(serverDetails, numReplicas)
    val finishUploadAck = true // TODO make this configurable

    if (writeClientQueueSize == 0) {
      val aClient = new MultiServerSyncWriteClient(serverReplicationGroups, partitionFanout, networkTimeoutMillis, maxTryingMillis, null, finishUploadAck, useConnectionPool, "user1", appId, appAttempt, shuffleWriteConfig)
      aClient.connect()
      writeClient = aClient
    }
    else {
      val aClient = new MultiServerAsyncWriteClient(serverReplicationGroups, partitionFanout, networkTimeoutMillis, maxTryingMillis, null, finishUploadAck, useConnectionPool, writeClientQueueSize, writeClientThreads, "user1", appId, appAttempt, shuffleWriteConfig)
      aClient.connect()
      writeClient = aClient
    }

    val shuffleWriter = new RssShuffleWriter(
      user = "user1",
      rssServers = new ServerList(serverDetails),
      writeClient = writeClient,
      mapInfo = new AppTaskAttemptId(appMapId, taskAttemptId),
      numMaps = numMaps,
      serializer = new KryoSerializer(sparkConf),
      compressionOptions = CompressionOptions(),
      compression = Compression.COMPRESSION_CODEC_LZ4,
      bufferOptions = BufferManagerOptions(writerBufferSize, 256 * 1024 * 1024, writerBufferSpill),
      shuffleDependency = shuffleDependency,
      stageMetrics = new ShuffleClientStageMetrics(new ShuffleClientStageMetricsKey("user1", "queue=1")),
      taskMetrics = new TaskMetrics(),
      conf = sparkConf
    )

    logInfo(s"Map $appMapId attempt $taskAttemptId started, write client: $writeClient")

    val records = testValues.asScala.map(t => {
      val index = random.nextInt(testValues.size())
      (t, testValues.get(index))
    })

    shuffleWriter.write(records.iterator)
    if (isLastTaskAttempt) {
      successShuffleWrittenRecords.addAndGet(records.size)
    }

    val mapStatus = shuffleWriter.stop(true)
    mapOutputTrackerMaster.registerMapOutput(appShuffleId.getShuffleId, appMapId.getMapId, mapStatus.get)

    // TODO simulate broken map tasks without proper closing
    logInfo(s"Map $appMapId attempt $taskAttemptId finished")
  }