in tensorpipe/benchmark/benchmark_pipe.cc [639:713]
static void runClient(const Options& options) {
std::string addr = options.address;
int numWarmUps = kNumWarmUpRounds;
int numRoundTrips = options.numRoundTrips;
Data data;
data.numPayloads = options.numPayloads;
data.payloadSize = options.payloadSize;
for (size_t payloadIdx = 0; payloadIdx < options.numPayloads; payloadIdx++) {
data.expectedPayload.push_back(createFullCpuData(options.payloadSize));
data.expectedPayloadMetadata.push_back(
std::string(options.metadataSize, 0x42));
data.temporaryPayload.push_back(createEmptyCpuData(options.payloadSize));
}
data.numTensors = options.numTensors;
data.tensorSize = options.tensorSize;
data.tensorType = options.tensorType;
for (size_t tensorIdx = 0; tensorIdx < options.numTensors; tensorIdx++) {
data.expectedTensorMetadata.push_back(
std::string(options.metadataSize, 0x42));
if (data.tensorType == TensorType::kCpu) {
data.expectedCpuTensor.push_back(createFullCpuData(options.tensorSize));
data.temporaryCpuTensor.push_back(createEmptyCpuData(options.tensorSize));
} else if (data.tensorType == TensorType::kCuda) {
data.expectedCudaTensor.push_back(createFullCudaData(options.tensorSize));
data.temporaryCudaTensor.push_back(
createEmptyCudaData(options.tensorSize));
data.cudaStream = createCudaStream();
} else {
TP_THROW_ASSERT() << "Unknown tensor type";
}
}
data.cudaSyncPeriod = options.cudaSyncPeriod;
data.expectedMetadata = std::string(options.metadataSize, 0x42);
MultiDeviceMeasurements measurements;
measurements.cpu.reserve(options.numRoundTrips);
measurements.cuda.reserve(options.numRoundTrips / data.cudaSyncPeriod);
std::shared_ptr<Context> context = std::make_shared<Context>();
auto transportContext =
TensorpipeTransportRegistry().create(options.transport);
validateTransportContext(transportContext);
context->registerTransport(0, options.transport, transportContext);
auto channelContext = TensorpipeChannelRegistry().create(options.channel);
validateChannelContext(channelContext);
context->registerChannel(0, options.channel, channelContext);
std::shared_ptr<Pipe> pipe = context->connect(addr);
#if USE_NCCL
ncclUniqueId uniqueId;
TP_NCCL_CHECK(ncclGetUniqueId(&uniqueId));
Message message;
message.metadata = std::string(
reinterpret_cast<char*>(&uniqueId),
reinterpret_cast<char*>(&uniqueId) + sizeof(ncclUniqueId));
std::promise<void> promise;
pipe->write(std::move(message), [&](const Error& error) {
TP_THROW_ASSERT_IF(error) << error.what();
promise.set_value();
});
promise.get_future().get();
data.ncclComm = createNcclComm(/*rank=*/1, /*worldSize=*/2, uniqueId);
#endif // USE_NCCL
std::promise<void> doneProm;
clientPingPongNonBlock(
std::move(pipe), numWarmUps, numRoundTrips, doneProm, data, measurements);
doneProm.get_future().get();
context->join();
}