in tensorpipe/benchmark/benchmark_pipe.cc [345:420]
static void runServer(const Options& options) {
std::string addr = options.address;
int numWarmUps = kNumWarmUpRounds;
int numRoundTrips = options.numRoundTrips;
Data data;
data.numPayloads = options.numPayloads;
data.payloadSize = options.payloadSize;
for (size_t payloadIdx = 0; payloadIdx < options.numPayloads; payloadIdx++) {
data.expectedPayload.push_back(createFullCpuData(options.payloadSize));
data.expectedPayloadMetadata.push_back(
std::string(options.metadataSize, 0x42));
data.temporaryPayload.push_back(createEmptyCpuData(options.payloadSize));
}
data.numTensors = options.numTensors;
data.tensorSize = options.tensorSize;
data.tensorType = options.tensorType;
for (size_t tensorIdx = 0; tensorIdx < options.numTensors; tensorIdx++) {
data.expectedTensorMetadata.push_back(
std::string(options.metadataSize, 0x42));
if (options.tensorType == TensorType::kCpu) {
data.expectedCpuTensor.push_back(createFullCpuData(options.tensorSize));
data.temporaryCpuTensor.push_back(createEmptyCpuData(options.tensorSize));
} else if (options.tensorType == TensorType::kCuda) {
data.expectedCudaTensor.push_back(createFullCudaData(options.tensorSize));
data.temporaryCudaTensor.push_back(
createEmptyCudaData(options.tensorSize));
data.cudaStream = createCudaStream();
} else {
TP_THROW_ASSERT() << "Unknown tensor type";
}
}
data.cudaSyncPeriod = options.cudaSyncPeriod;
data.expectedMetadata = std::string(options.metadataSize, 0x42);
Measurements measurements;
measurements.reserve(options.numRoundTrips);
std::shared_ptr<Context> context = std::make_shared<Context>();
auto transportContext =
TensorpipeTransportRegistry().create(options.transport);
validateTransportContext(transportContext);
context->registerTransport(0, options.transport, transportContext);
auto channelContext = TensorpipeChannelRegistry().create(options.channel);
validateChannelContext(channelContext);
context->registerChannel(0, options.channel, channelContext);
std::promise<std::shared_ptr<Pipe>> pipeProm;
std::shared_ptr<Listener> listener = context->listen({addr});
listener->accept([&](const Error& error, std::shared_ptr<Pipe> pipe) {
TP_THROW_ASSERT_IF(error) << error.what();
pipeProm.set_value(std::move(pipe));
});
std::shared_ptr<Pipe> pipe = pipeProm.get_future().get();
#if USE_NCCL
std::promise<ncclUniqueId> uniqueIdProm;
pipe->readDescriptor([&](const Error& error, Descriptor descriptor) {
TP_THROW_ASSERT_IF(error) << error.what();
uniqueIdProm.set_value(
*reinterpret_cast<const ncclUniqueId*>(descriptor.metadata.c_str()));
});
ncclUniqueId uniqueId = uniqueIdProm.get_future().get();
data.ncclComm = createNcclComm(/*rank=*/0, /*worldSize=*/2, uniqueId);
#endif
std::promise<void> doneProm;
serverPongPingNonBlock(
std::move(pipe), numWarmUps, numRoundTrips, doneProm, data, measurements);
doneProm.get_future().get();
listener.reset();
context->join();
}