static void runClient()

in tensorpipe/benchmark/benchmark_pipe.cc [639:713]


static void runClient(const Options& options) {
  std::string addr = options.address;
  int numWarmUps = kNumWarmUpRounds;
  int numRoundTrips = options.numRoundTrips;

  Data data;
  data.numPayloads = options.numPayloads;
  data.payloadSize = options.payloadSize;
  for (size_t payloadIdx = 0; payloadIdx < options.numPayloads; payloadIdx++) {
    data.expectedPayload.push_back(createFullCpuData(options.payloadSize));
    data.expectedPayloadMetadata.push_back(
        std::string(options.metadataSize, 0x42));
    data.temporaryPayload.push_back(createEmptyCpuData(options.payloadSize));
  }
  data.numTensors = options.numTensors;
  data.tensorSize = options.tensorSize;
  data.tensorType = options.tensorType;
  for (size_t tensorIdx = 0; tensorIdx < options.numTensors; tensorIdx++) {
    data.expectedTensorMetadata.push_back(
        std::string(options.metadataSize, 0x42));
    if (data.tensorType == TensorType::kCpu) {
      data.expectedCpuTensor.push_back(createFullCpuData(options.tensorSize));
      data.temporaryCpuTensor.push_back(createEmptyCpuData(options.tensorSize));
    } else if (data.tensorType == TensorType::kCuda) {
      data.expectedCudaTensor.push_back(createFullCudaData(options.tensorSize));
      data.temporaryCudaTensor.push_back(
          createEmptyCudaData(options.tensorSize));
      data.cudaStream = createCudaStream();
    } else {
      TP_THROW_ASSERT() << "Unknown tensor type";
    }
  }
  data.cudaSyncPeriod = options.cudaSyncPeriod;
  data.expectedMetadata = std::string(options.metadataSize, 0x42);

  MultiDeviceMeasurements measurements;
  measurements.cpu.reserve(options.numRoundTrips);
  measurements.cuda.reserve(options.numRoundTrips / data.cudaSyncPeriod);

  std::shared_ptr<Context> context = std::make_shared<Context>();
  auto transportContext =
      TensorpipeTransportRegistry().create(options.transport);
  validateTransportContext(transportContext);
  context->registerTransport(0, options.transport, transportContext);

  auto channelContext = TensorpipeChannelRegistry().create(options.channel);
  validateChannelContext(channelContext);
  context->registerChannel(0, options.channel, channelContext);

  std::shared_ptr<Pipe> pipe = context->connect(addr);

#if USE_NCCL
  ncclUniqueId uniqueId;
  TP_NCCL_CHECK(ncclGetUniqueId(&uniqueId));
  Message message;
  message.metadata = std::string(
      reinterpret_cast<char*>(&uniqueId),
      reinterpret_cast<char*>(&uniqueId) + sizeof(ncclUniqueId));
  std::promise<void> promise;
  pipe->write(std::move(message), [&](const Error& error) {
    TP_THROW_ASSERT_IF(error) << error.what();
    promise.set_value();
  });
  promise.get_future().get();

  data.ncclComm = createNcclComm(/*rank=*/1, /*worldSize=*/2, uniqueId);
#endif // USE_NCCL

  std::promise<void> doneProm;
  clientPingPongNonBlock(
      std::move(pipe), numWarmUps, numRoundTrips, doneProm, data, measurements);

  doneProm.get_future().get();
  context->join();
}