static void runServer()

in tensorpipe/benchmark/benchmark_pipe.cc [345:420]


static void runServer(const Options& options) {
  std::string addr = options.address;
  int numWarmUps = kNumWarmUpRounds;
  int numRoundTrips = options.numRoundTrips;

  Data data;
  data.numPayloads = options.numPayloads;
  data.payloadSize = options.payloadSize;
  for (size_t payloadIdx = 0; payloadIdx < options.numPayloads; payloadIdx++) {
    data.expectedPayload.push_back(createFullCpuData(options.payloadSize));
    data.expectedPayloadMetadata.push_back(
        std::string(options.metadataSize, 0x42));
    data.temporaryPayload.push_back(createEmptyCpuData(options.payloadSize));
  }
  data.numTensors = options.numTensors;
  data.tensorSize = options.tensorSize;
  data.tensorType = options.tensorType;
  for (size_t tensorIdx = 0; tensorIdx < options.numTensors; tensorIdx++) {
    data.expectedTensorMetadata.push_back(
        std::string(options.metadataSize, 0x42));
    if (options.tensorType == TensorType::kCpu) {
      data.expectedCpuTensor.push_back(createFullCpuData(options.tensorSize));
      data.temporaryCpuTensor.push_back(createEmptyCpuData(options.tensorSize));
    } else if (options.tensorType == TensorType::kCuda) {
      data.expectedCudaTensor.push_back(createFullCudaData(options.tensorSize));
      data.temporaryCudaTensor.push_back(
          createEmptyCudaData(options.tensorSize));
      data.cudaStream = createCudaStream();
    } else {
      TP_THROW_ASSERT() << "Unknown tensor type";
    }
  }
  data.cudaSyncPeriod = options.cudaSyncPeriod;
  data.expectedMetadata = std::string(options.metadataSize, 0x42);

  Measurements measurements;
  measurements.reserve(options.numRoundTrips);

  std::shared_ptr<Context> context = std::make_shared<Context>();
  auto transportContext =
      TensorpipeTransportRegistry().create(options.transport);
  validateTransportContext(transportContext);
  context->registerTransport(0, options.transport, transportContext);

  auto channelContext = TensorpipeChannelRegistry().create(options.channel);
  validateChannelContext(channelContext);
  context->registerChannel(0, options.channel, channelContext);

  std::promise<std::shared_ptr<Pipe>> pipeProm;
  std::shared_ptr<Listener> listener = context->listen({addr});
  listener->accept([&](const Error& error, std::shared_ptr<Pipe> pipe) {
    TP_THROW_ASSERT_IF(error) << error.what();
    pipeProm.set_value(std::move(pipe));
  });
  std::shared_ptr<Pipe> pipe = pipeProm.get_future().get();

#if USE_NCCL
  std::promise<ncclUniqueId> uniqueIdProm;
  pipe->readDescriptor([&](const Error& error, Descriptor descriptor) {
    TP_THROW_ASSERT_IF(error) << error.what();
    uniqueIdProm.set_value(
        *reinterpret_cast<const ncclUniqueId*>(descriptor.metadata.c_str()));
  });
  ncclUniqueId uniqueId = uniqueIdProm.get_future().get();

  data.ncclComm = createNcclComm(/*rank=*/0, /*worldSize=*/2, uniqueId);
#endif

  std::promise<void> doneProm;
  serverPongPingNonBlock(
      std::move(pipe), numWarmUps, numRoundTrips, doneProm, data, measurements);

  doneProm.get_future().get();
  listener.reset();
  context->join();
}