maga_transformer/cpp/devices/testing/TestBase.h (354 lines of code) (raw):
#pragma once
#include <arpa/inet.h>
#include <gtest/gtest.h>
#include <netinet/in.h>
#include <numeric>
#include <stdlib.h>
#include <sys/socket.h>
#include <torch/torch.h>
#ifdef ENABLE_FP8
#include <cuda_fp8.h>
#endif
#include "maga_transformer/cpp/devices/DeviceFactory.h"
#include "maga_transformer/cpp/core/torch_utils/BufferTorchUtils.h"
#include "maga_transformer/cpp/core/Buffer.h"
#include "maga_transformer/cpp/utils/Logger.h"
#include "maga_transformer/cpp/th_op/GptInitParameter.h"
#include "maga_transformer/cpp/cache/CacheManager.h"
#include "maga_transformer/cpp/utils/KVCacheUtils.h"
#include "maga_transformer/cpp/cache/BatchKVCacheResource.h"
#include "autil/EnvUtil.h"
using namespace rtp_llm;
static const std::string DEFAULT_DEVICE = "CPU";
#define ASSERT_VECTOR_EQUAL(x, y) \
ASSERT_EQ(x.size(), y.size()) << "Vectors x and y are of unequal length"; \
for (int i = 0; i < x.size(); ++i) { \
ASSERT_EQ(x[i], y[i]) << "Vectors x and y differ at index " << i; \
}
#define ASSERT_VECTOR_NEAR(x, y, abs_error) \
ASSERT_EQ(x.size(), y.size()) << "Vectors x and y are of unequal length"; \
for (int i = 0; i < x.size(); ++i) { \
ASSERT_NEAR(x[i], y[i], abs_error) << "Vectors x and y differ at index " << i; \
}
class EngineBaseTest: public ::testing::Test {
public:
void SetUp() override {
rtp_llm::initLogger();
initTestDataDir();
torch::manual_seed(114514);
setenv("SAMPLE_TEST", "1", 1);
}
virtual void initTestDevices() {}
void initTestDataDir() {
const auto test_src_dir = getenv("TEST_SRCDIR");
const auto test_work_space = getenv("TEST_WORKSPACE");
const auto test_binary = getenv("TEST_BINARY");
if (!(test_src_dir && test_work_space && test_binary)) {
std::cerr << "Unable to retrieve TEST_SRCDIR / TEST_WORKSPACE / TEST_BINARY env!" << std::endl;
abort();
}
std::string test_binary_str = std::string(test_binary);
RTP_LLM_CHECK(*test_binary_str.rbegin() != '/');
size_t filePos = test_binary_str.rfind('/');
test_data_path_ = std::string(test_src_dir) + "/" + std::string(test_work_space) + "/"
+ test_binary_str.substr(0, filePos) + "/";
std::cout << "test_src_dir [" << test_src_dir << "]" << std::endl;
std::cout << "test_work_space [" << test_work_space << "]" << std::endl;
std::cout << "test_binary [" << test_binary << "]" << std::endl;
std::cout << "test using data path [" << test_data_path_ << "]" << std::endl;
}
void TearDown() override {}
protected:
std::string test_data_path_;
};
class DeviceTestBase: public EngineBaseTest {
public:
void SetUp() override {
EngineBaseTest::SetUp();
initTestDevices();
torch::manual_seed(114514);
}
virtual void initTestDevices() {
autil::EnvUtil::setEnv("DEVICE_RESERVE_MEMORY_BYTES", std::to_string(device_reserve_memory_size_));
autil::EnvUtil::setEnv("HOST_RESERVE_MEMORY_BYTES", std::to_string(host_reserve_memory_size_));
rtp_llm::DeviceFactory::initDevices(rtp_llm::GptInitParameter());
device_ = rtp_llm::DeviceFactory::getDefaultDevice();
}
void TearDown() override {}
protected:
template <typename T>
void printBuffer(const rtp_llm::Buffer& buffer, const std::string& hint = "") {
auto values = getBufferValues<T>(buffer);
for (size_t i = 0; i < values.size(); i++) {
std::cout << values[i] << " ";
}
std::cout << " " << hint << std::endl;
}
rtp_llm::BufferPtr createBuffer(const std::vector<size_t>& shape, rtp_llm::DataType type,
rtp_llm::AllocationType alloc_type = rtp_llm::AllocationType::DEVICE)
{
if (alloc_type == rtp_llm::AllocationType::DEVICE) {
return device_->allocateBuffer({type, shape, rtp_llm::AllocationType::DEVICE}, {});
} else {
return device_->allocateBuffer({type, shape, rtp_llm::AllocationType::HOST}, {});
}
}
template <typename T>
rtp_llm::BufferPtr createBuffer(const std::vector<size_t>& shape, const std::vector<T>& data,
rtp_llm::AllocationType alloc_type = rtp_llm::AllocationType::DEVICE)
{
const auto num_elements = std::accumulate(shape.begin(), shape.end(), 1, std::multiplies<size_t>());
RTP_LLM_CHECK(num_elements == data.size());
if (alloc_type == rtp_llm::AllocationType::DEVICE) {
return createDeviceBuffer<T>(shape, data.data());
} else {
return createHostBuffer<T>(shape, data.data());
}
}
template <typename T>
rtp_llm::BufferPtr createHostBuffer(const std::vector<size_t>& shape, const T* data) {
return createHostBuffer<T>(shape, static_cast<const void*>(data));
}
template <typename T>
rtp_llm::BufferPtr createHostBuffer(const std::vector<size_t>& shape, const void* data) {
auto buffer = device_->allocateBuffer({rtp_llm::getTensorType<T>(), shape, rtp_llm::AllocationType::HOST}, {});
if (data && (buffer->size() > 0)) {
memcpy(buffer->data(), data, sizeof(T) * buffer->size());
}
device_->syncAndCheck();
return buffer;
}
template <typename T>
rtp_llm::BufferPtr createDeviceBuffer(const std::vector<size_t>& shape, const void* data) {
auto host_buffer = createHostBuffer<T>(shape, data);
auto buffer = device_->allocateBuffer({rtp_llm::getTensorType<T>(), shape, rtp_llm::AllocationType::DEVICE}, {});
if (data && (buffer->size() > 0)) {
device_->copy({*buffer, *host_buffer});
}
device_->syncAndCheck();
return buffer;
}
template <typename T>
rtp_llm::BufferPtr createDeviceBuffer(torch::Tensor tensor) {
const auto targetType = c10::CppTypeToScalarType<T>::value;
if (tensor.scalar_type() != targetType) {
tensor = tensor.to(targetType);
}
return tensorToBuffer(tensor);
}
template<typename T>
void assertBufferValueEqual(const rtp_llm::Buffer& buffer, const std::vector<T>& expected) {
ASSERT_EQ(buffer.size(), expected.size());
auto comp_buffer = device_->allocateBuffer(
{buffer.type(), buffer.shape(), rtp_llm::AllocationType::HOST}
);
device_->copy({*comp_buffer, buffer});
device_->syncAndCheck();
for (size_t i = 0; i < buffer.size(); i++) {
printf("i=%ld, buffer[i] = %f, expected[i] = %f\n", i,
float((comp_buffer->data<T>())[i]), float(expected[i]));
ASSERT_EQ((comp_buffer->data<T>())[i], expected[i]);
}
}
template<typename T>
std::vector<T> getBufferValues(const rtp_llm::Buffer& buffer) {
std::vector<T> values(buffer.size());
device_->syncAndCheck();
if (buffer.where() == rtp_llm::MemoryType::MEMORY_GPU) {
auto host_buffer = createHostBuffer<T>(buffer.shape(), nullptr);
device_->copy({*host_buffer, buffer});
device_->syncAndCheck();
memcpy(values.data(), host_buffer->data(), sizeof(T) * buffer.size());
} else {
memcpy(values.data(), buffer.data(), sizeof(T) * buffer.size());
}
return values;
}
rtp_llm::BufferPtr tensorToBuffer(const torch::Tensor& tensor,
rtp_llm::AllocationType alloc_type = rtp_llm::AllocationType::DEVICE)
{
if (tensor.is_quantized()) {
return tensorToBuffer(tensor,
tensor.q_per_channel_scales().to(torch::kHalf),
tensor.q_per_channel_zero_points().to(torch::kHalf));
}
RTP_LLM_CHECK(tensor.is_cpu());
auto buffer = rtp_llm::torchTensor2Buffer(tensor);
if (alloc_type == rtp_llm::AllocationType::DEVICE) {
auto device_buffer = device_->allocateBuffer(
{buffer->type(), buffer->shape(), rtp_llm::AllocationType::DEVICE}
);
device_->copy({*device_buffer, *buffer});
device_->syncAndCheck();
printf("created device buffer from tensor at %p with data=%p\n", device_buffer.get(), device_buffer->data());
return device_buffer;
} else {
return buffer;
}
}
rtp_llm::BufferPtr tensorToBuffer(const torch::Tensor& tensor,
const torch::Tensor& scales,
const torch::Tensor& zeros,
rtp_llm::AllocationType alloc_type = rtp_llm::AllocationType::DEVICE)
{
auto buffer = rtp_llm::torchTensor2Buffer(tensor, scales, zeros);
if (alloc_type == rtp_llm::AllocationType::DEVICE) {
auto device_buffer = device_->allocateBufferLike(*buffer);
device_->copy({*device_buffer, *buffer});
device_->syncAndCheck();
printf("created device buffer from tensor at %p with data=%p\n", device_buffer.get(), device_buffer->data());
return device_buffer;
} else {
return buffer;
}
}
torch::Tensor bufferToTensor(const rtp_llm::Buffer& buffer, rtp_llm::DeviceBase* device = nullptr) {
if (!device) {
device = device_;
}
auto host_buffer = device->allocateBuffer(
{buffer.type(), buffer.shape(), rtp_llm::AllocationType::HOST}
);
device->copy({*host_buffer, buffer});
device->syncAndCheck();
return torch::from_blob(
host_buffer->data(), bufferShapeToTorchShape(buffer),
c10::TensorOptions().device(torch::Device(torch::kCPU))
.dtype(dataTypeToTorchType(buffer.type()))
).clone();
}
rtp_llm::BufferPtr allocateKVBlocks(const rtp_llm::CacheConfig& cache_config,
const std::vector<int32_t>& input_lengths,
torch::Tensor& kvCache)
{
if (!cache_manager_) {
cache_manager_ = std::make_shared<rtp_llm::CacheManager>(cache_config, device_);
}
auto max_seq_len = *std::max_element(input_lengths.begin(), input_lengths.end());
max_seq_len = (max_seq_len == 0) ? 1 : max_seq_len;
const auto tokensPerBlock = cache_config.seq_size_per_block;
const auto batch_layer_kv_block_num = ((max_seq_len + tokensPerBlock - 1) / tokensPerBlock + 1);
const auto batch_size = input_lengths.size();
auto kv_cache_block_id = device_->allocateBuffer({
rtp_llm::DataType::TYPE_INT32, {batch_size, batch_layer_kv_block_num}, rtp_llm::AllocationType::HOST
});
rtp_llm::BatchKVCacheResource batch_kv_cache;
for (auto i = 0; i < batch_size; i++) {
auto [success, kv_cache] = cache_manager_->malloc({0, batch_layer_kv_block_num, true});
EXPECT_TRUE(success);
batch_kv_cache.pushBack(kv_cache);
}
for (auto i = 0; i < batch_size; i++) {
std::memcpy((*kv_cache_block_id)[i].data(),
batch_kv_cache.batch_block_id[i].data(),
batch_kv_cache.batch_block_id[i].size() * sizeof(int));
// [batch(i), layer_num(j), ...]
if (kvCache.dim() == 5) {
// [layernum, batch, 2, max_pad_seq, dim]
auto max_pad_seq = kvCache.sizes()[3];
auto k_indexs = batch_kv_cache.batch_block_id[i];
for (auto k = 0; k < (max_pad_seq / cache_config.seq_size_per_block); k++) {
auto block_start = k * cache_config.seq_size_per_block;
auto block_end = block_start + cache_config.seq_size_per_block;
auto kblock = kvCache.index(
{torch::indexing::Slice(),
i,
0,
torch::indexing::Slice(block_start, block_end),
torch::indexing::Slice()}).contiguous();
auto vblock = kvCache.index(
{torch::indexing::Slice(),
i,
1,
torch::indexing::Slice(block_start, block_end),
torch::indexing::Slice()}).contiguous();
auto kblock_buffer = rtp_llm::torchTensor2Buffer(kblock);
auto vblock_buffer = rtp_llm::torchTensor2Buffer(vblock);
cache_manager_->setKVBlockValue(k_indexs[k],
*kblock_buffer,
*vblock_buffer);
}
}
}
return kv_cache_block_id;
}
void assertTensorClose(const torch::Tensor& a, const torch::Tensor& b,
double rtol = 0, double atol = 0) {
auto a_cmp = a;
auto b_cmp = b;
rtol = rtol ? rtol : rtol_;
atol = atol ? atol : rtol_;
ASSERT_TRUE(a.is_floating_point() == b.is_floating_point());
if (a_cmp.dtype() != b_cmp.dtype()) {
auto cmp_type = (a_cmp.dtype().itemsize() > b_cmp.dtype().itemsize()) ?
a_cmp.dtype() : b_cmp.dtype();
a_cmp = a_cmp.to(cmp_type);
b_cmp = b_cmp.to(cmp_type);
}
a_cmp = a_cmp.squeeze();
b_cmp = b_cmp.squeeze();
const auto close = torch::allclose(a_cmp, b_cmp, rtol, atol);
if (!close) {
std::cout << "assert tensor close failed!" << std::endl;
std::cout << "rtol: " << rtol << std::endl;
std::cout << "atol: " << atol << std::endl;
std::cout << "a: " << a << std::endl;
std::cout << "b: " << b << std::endl;
std::cout << "abs diff: " << torch::abs(a_cmp - b_cmp) << std::endl;
std::cout << "rel diff: " << torch::abs(a_cmp - b_cmp) / torch::abs(a_cmp) << std::endl;
ASSERT_TRUE(false);
}
}
size_t getFreePort() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
EXPECT_TRUE(sockfd >= 0);
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = 0;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
EXPECT_TRUE(false);
}
socklen_t addr_len = sizeof(addr);
if (getsockname(sockfd, (struct sockaddr*)&addr, &addr_len) < 0) {
EXPECT_TRUE(false);
}
close(sockfd);
return ntohs(addr.sin_port);
}
const std::string &getTestDataPath() const {
return test_data_path_;
}
torch::Tensor randTensor(at::IntArrayRef shape,
torch::Dtype dtype,
int64_t seed = 0) {
torch::TensorOptions float_options =
torch::TensorOptions(torch::kFloat).device(torch::Device(torch::kCPU));
torch::TensorOptions half_tensor_options =
torch::TensorOptions(torch::kFloat16).device(torch::Device(torch::kCPU));
auto generator = at::detail::createCPUGenerator();
if (seed != 0) {
generator = at::detail::createCPUGenerator(seed);
}
auto output = torch::rand(shape, generator, float_options);
if (c10::isQIntType(dtype)) {
int axis = output.dim()-1;
auto scales = torch::rand(output.sizes()[axis], half_tensor_options);
auto zeros = torch::zeros(output.sizes()[axis]);
output = at::quantize_per_channel(output, scales, zeros, axis, dtype);
} else {
output = output.to(dtype);
}
return output;
}
protected:
rtp_llm::DeviceBase* device_ = nullptr;
double rtol_ = 1e-03;
double atol_ = 1e-03;
rtp_llm::CacheManagerPtr cache_manager_;
size_t device_reserve_memory_size_ = 1024L * 1024 * 1024; // 1MB;
size_t host_reserve_memory_size_ = 1L * 1024 * 1024 * 1024; // 1GB;
};
#define RTP_LLM_RUN_DEVICE_TEST(test_class, case_name, ...) \
TEST_F(test_class, case_name) { \
case_name(__VA_ARGS__); \
}