in src/job.cc [48:474]
bool Job::run_job() {
/*******************
* Set up Targets
*******************/
// TODO move target setup to Profile or Target
// TODO explore faster ways to set up files (async, copying files for multiple targets etc)
// buffers to use when setting up targets
const size_t fill_buf_size = 64*1024*1024;
char * fill_buf = (char*)malloc(fill_buf_size*sizeof(char));
// fill with ascending bytes
for (unsigned i = 0; i < fill_buf_size; ++i) fill_buf[i] = (char)(i&0xFF);
// and create a zero buffer
char * zero_buf = (char*)calloc(fill_buf_size, sizeof(char));
// set up target files
v_printf("Setting up target files\n");
for (auto& target : options->targets) {
/*
size_t block_sz;
int rioctl = ioctl(fd, BLKSSZGET, &block_sz);
if (rioctl == -1) {
perror("ioctl error");
}
target->sector_size = block_sz;
*/
size_t sector_size = target->sector_size;
// check alignment if target is going to be opened with O_DIRECT
if (target->open_flags & O_DIRECT && (
(target->block_size & (sector_size - 1)) || // -b
(target->stride & (sector_size - 1)) || // -s and -r
(target->thread_offset & (sector_size - 1)) // -T
)) {
fprintf(stderr, "O_DIRECT specified, but block size, stride or thread stride "
"(-b, -s, -r, -T) argument isn't block aligned!\n");
return false;
}
// how many threads will operate on this file?
unsigned int no_threads =
options->use_total_threads ? options->total_threads : target->threads_per_target;
// what's the highest offset we can write to without going over the file size?
off_t max_offset = target->max_size - target->base_offset - target->block_size;
// check that none of our starting offsets for threads would be past the max size
if (max_offset < target->thread_offset*(no_threads-1)) {
fprintf(stderr,
"File setup failed; I/O offset would overwrite end of file.\n"
"Solution: reduce -T (more overlap between threads), or -t or -F (less "
"threads per file), or increase file size (-c)\n");
return false;
}
// now open the file and set it up
int fd;
if (target->create_file) {
// remove the file first if it already exists
fd = open(target->path.c_str(), O_WRONLY | O_SYNC, 0664);
int rresult = remove(target->path.c_str());
if (rresult && errno != ENOENT) {
fprintf(stderr, "Failed to remove old file %s\n", target->path.c_str());
#ifdef ENABLE_DEBUG
perror("remove old file failed");
#endif
return false;
}
// create the file
fd = open(target->path.c_str(), O_CREAT | O_EXCL | O_WRONLY | O_SYNC, 0664);
} else {
continue;
}
if (fd == -1) {
fprintf(stderr, "Failed to open file %s\n", target->path.c_str());
#ifdef ENABLE_DEBUG
perror("open file failed");
#endif
return false;
}
// lseek to the base offset
off_t lresult = lseek(fd, target->base_offset, SEEK_SET);
if (lresult != target->base_offset) {
fprintf(stderr, "Failed to setup file %s\n", target->path.c_str());
#ifdef ENABLE_DEBUG
perror("lseek to base offset failed");
#endif
}
// fill with ascending bytes or zeros?
char * buf_to_use = target->zero_buffers ? zero_buf : fill_buf;
// fill the file up to its max size
off_t remaining_bytes = target->max_size - target->base_offset;
v_printf(" Laying out \"%s\"\n", target->path.c_str());
while(remaining_bytes) {
// how much to write on this loop
size_t nbytes = fill_buf_size < remaining_bytes ? fill_buf_size : remaining_bytes;
ssize_t wresult = write(fd, buf_to_use, nbytes);
if (wresult != nbytes) {
fprintf(stderr, "Failed to setup file %s\n", target->path.c_str());
#ifdef ENABLE_DEBUG
perror("Write error");
fprintf(stderr, "Write returned %ld\n", wresult);
#endif
return false;
}
remaining_bytes -= nbytes;
}
close(fd);
}
free(fill_buf);
free(zero_buf);
// get device name and scheduler for each target
for (auto& target : options->targets) {
struct stat buf = {0};
int err = stat(target->path.c_str(), &buf);
// use appropriate device id (st_dev != st_rdev if target is a device)
target->device = options->sys_info->device_from_id(buf.st_rdev ? buf.st_rdev : buf.st_dev);
target->scheduler = options->sys_info->scheduler_from_device(target->device);
}
/***********************
* Create ThreadParams
***********************/
// initialize results pointer
results = std::make_shared<JobResults>();
// prefix with a v to hopefully avoid confusion in this large function
std::vector<std::shared_ptr<ThreadParams>> thread_params;
// Generic initialization for each thread param
for (unsigned int i = 0, id = 0; i < options->total_threads; ++i) {
auto th = std::make_shared<ThreadParams>();
// create results
auto th_results = std::make_shared<ThreadResults>();
// add to thread and job
th->results = th_results;
results->thread_results.push_back(th_results);
// tell it what Job it's serving
th->job = this;
// set absolute id
th->thread_id = th->results->thread_id = id++;
th->job_options = options;
th->io_manager = options->io_manager;
// give it pointers to job synchronization structures
th->run_threads = &run_threads;
th->record_results = &record_results;
th->thread_error = &thread_error;
// push to the vector
thread_params.push_back(th);
}
// count total overlap operations for io engine
unsigned int total_overlap = 0;
// index into thread_params for -t
unsigned int index = 0;
for (auto& target : options->targets) {
// if -F, we want to go through all threads and add this target to each of them
// if -t, we want to add this target to just some of them
unsigned int loop_limit =
options->use_total_threads ? options->total_threads : target->threads_per_target;
for (
unsigned int inner_index = 0;
inner_index < loop_limit;
++inner_index
) {
total_overlap += target->overlap;
// get the thread param
std::shared_ptr<ThreadParams> th;
if (options->use_total_threads) {
// -F, just use the inner index as it will go through all the threads
th = thread_params[inner_index];
} else {
// -t, increment the outer index;
// when the outer loop ends we will have assigned each thread one target
th = thread_params[index++];
}
// Create a new target data for this target
auto t_data = std::make_shared<TargetData>();
t_data->target = target;
// Create a new target results for this target
auto t_results = std::make_shared<TargetResults>();
t_results->target = target;
// Tell the target data who its target results is
t_data->results = t_results;
// And who its thread is
t_data->thread = th;
// give the target data and results to the thread params and its results struct
th->targets.push_back(t_data);
th->results->target_results.push_back(t_results);
// tell it who it is - this will match the absolute id if -F is specified
// this operation is duplicated many times for -F but it's fine
th->rel_thread_id = inner_index;
// just a sanity check
if (options->use_total_threads) {
assert(th->rel_thread_id == th->thread_id);
}
}
}
// Maps of cpu ids to usage stats
// each vector in these two contains absolute amount of time for user, nice, kernel, idle
std::map<unsigned int, std::vector <double> > cpu_stats_init;
std::map<unsigned int, std::vector <double> > cpu_stats_end;
// start io_manager
if (!options->io_manager->start(total_overlap)) {
fprintf(stderr, "io engine failed to start\n");
return false;
}
/*****************
* Start threads
*****************/
v_printf("Starting %u threads... ", options->total_threads);
fflush(stdout);
// we'll use this shared counter to keep track of which threads have completed initialization
thread_counter = 0;
// create a cpuset of the size needed for the number of cpus in the system
cpu_set_t * cpu_set = CPU_ALLOC(options->sys_info->cpuhi + 1); // highest cpu id + 1
size_t cpu_set_size = CPU_ALLOC_SIZE(options->sys_info->cpuhi + 1);
auto cpuit = options->sys_info->affinity_cpus.begin();
for (auto& t : thread_params) {
int err = pthread_create(&t->thread_handle, NULL, _thread_func, static_cast<void *>(t.get()));
if (err) {
perror("Couldn't create pthread");
return false;
}
// cpu affinity
if (!options->disable_affinity) {
// set the bit corresponding to the next cpu to affinitize to
CPU_ZERO_S(cpu_set_size, cpu_set);
CPU_SET_S(*cpuit, cpu_set_size, cpu_set);
// actually set the affinity
err = pthread_setaffinity_np(t->thread_handle, cpu_set_size, (const cpu_set_t*)cpu_set);
if (err) {
perror("Couldn't affinitize pthread");
return false;
}
// affinitize round-robin by default; so loop back to the start
++cpuit;
if (cpuit == options->sys_info->affinity_cpus.end()) {
cpuit = options->sys_info->affinity_cpus.begin();
}
}
}
CPU_FREE(cpu_set);
// sleep on thread initialization, with a timeout in case of errors
std::cv_status timeout_status; // for getting result of wait_for
std::chrono::milliseconds init_timeout(1); // the timeout for checking errors
int timeout_counter = 10000; // actual timeout in seconds for initialization
std::unique_lock<std::mutex> thread_lock(thread_mutex);
// loop as long as all threads haven't been initialized
while(thread_counter < options->total_threads) {
timeout_status = thread_cv.wait_for(thread_lock, init_timeout);
// if an error happened, get out
if (thread_error) break;
if (timeout_status == std::cv_status::timeout) {
--timeout_counter; // decrement the timeout counter - a second has passed
}
if (timeout_counter <= 0) {
fprintf(stderr, "Thread initialization timed out!\n");
return false;
}
}
thread_lock.unlock();
// check for error
if (thread_error) {
fprintf(stderr, "Error during thread initialization!\n");
return false;
}
v_printf("All threads initialized\n");
/*************
* Warmup
*************/
if(options->warmup_time) {
v_printf("Warming up for %u second%s\n", options->warmup_time, options->warmup_time > 1 ? "s" : "");
std::chrono::seconds warmupdur(options->warmup_time);
// sleep until we're either woken up early (due to error) or the warmup is over
std::unique_lock<std::mutex> thread_warmup_lock(thread_mutex);
timeout_status = thread_error_cv.wait_for(thread_warmup_lock, warmupdur);
thread_warmup_lock.unlock();
if (timeout_status == std::cv_status::no_timeout || thread_error) {
fprintf(stderr, "Error during warmup phase!\n");
return false;
}
v_printf("Finished warming up; main test will run for %u second%s\n", options->duration, options->duration > 1 ? "s" : "");
} else {
v_printf("Performing main test for %u second%s\n", options->duration, options->duration > 1 ? "s" : "");
}
/*************
* Duration
*************/
// measure initial processor times
cpu_stats_init = options->sys_info->get_cpu_stats();
// get start time
options->start_time_ns = PerfClock::get_time_ns();
options->start_time_us = options->start_time_ns/1000;
options->start_time_ms = options->start_time_us/1000;
std::chrono::seconds maindur(options->duration);
// sleep until we're either woken up early (due to error) or the duration is over
std::unique_lock<std::mutex> thread_duration_lock(thread_mutex);
// start recording data
record_results = true;
// sleep
timeout_status = thread_error_cv.wait_for(thread_duration_lock, maindur);
// stop recording data
record_results = false;
thread_duration_lock.unlock();
if (timeout_status == std::cv_status::no_timeout || thread_error) {
fprintf(stderr, "Error during main test!\n");
return false;
}
// measure end processor times
cpu_stats_end = options->sys_info->get_cpu_stats();
// TODO cooldown (-C)
/*
*************
* Cleanup
*************
*/
// kill threads
run_threads = false;
// block on threads finishing
for (auto& t : thread_params) {
pthread_join(t->thread_handle, NULL);
}
// convert processor times to processor usage percentages
results->cpu_usage_percentages.clear();
for (auto& c: cpu_stats_init) {
// resize the vector
results->cpu_usage_percentages[c.first].resize(5);
// copy the init and end vectors for this cpu to make this bit more readable
std::vector<double> init, end;
init = cpu_stats_init[c.first];
end = cpu_stats_end[c.first];
// differences between end and init will give the actual time in each state
// remember 0 = user, 1 = nice, 2 = kernel, 3 = idle, 4 = iowait
double total_time = (end[0]+end[1]+end[2]+end[3]+end[4]) - (init[0]+init[1]+init[2]+init[3]+init[4]);
// total usage not including iowait or idle time
double nonidle = (end[0]+end[1]+end[2]) - (init[0]+init[1]+init[2]);
double user = (end[0]+end[1]) - (init[0]+init[1]);
double kernel = (end[2]) - (init[2]);
double iowait = (end[4]) - (init[4]);
double idle = (end[3]) - (init[3]);
results->cpu_usage_percentages[c.first][0] = nonidle/total_time;
results->cpu_usage_percentages[c.first][1] = user/total_time;
results->cpu_usage_percentages[c.first][2] = kernel/total_time;
results->cpu_usage_percentages[c.first][3] = iowait/total_time;
results->cpu_usage_percentages[c.first][4] = idle/total_time;
}
v_printf("Job done\n");
return true;
}