in include/MonotonicFixpointIterator.h [399:491]
void run(const Domain& init) {
this->set_all_to_bottom(m_all_nodes);
Context context(init, m_all_nodes);
std::unique_ptr<std::atomic<uint32_t>[]> wpo_counter(
new std::atomic<uint32_t>[m_wpo.size()]);
std::fill_n(wpo_counter.get(), m_wpo.size(), 0);
auto entry_idx = m_wpo.get_entry();
assert(m_wpo.get_num_preds(entry_idx) == 0);
// Prepare work queue.
auto wq = sparta::work_queue<uint32_t>(
[&context, &entry_idx, &wpo_counter, this](WPOWorkerState* worker_state,
uint32_t wpo_idx) {
std::atomic<uint32_t>& current_counter = wpo_counter[wpo_idx];
assert(current_counter == m_wpo.get_num_preds(wpo_idx));
current_counter = 0;
// NonExit node
if (!m_wpo.is_exit(wpo_idx)) {
this->analyze_vertex(&context, m_wpo.get_node(wpo_idx));
for (auto succ_idx : m_wpo.get_successors(wpo_idx)) {
std::atomic<uint32_t>& succ_counter = wpo_counter[succ_idx];
// Increase succ node's counter, push succ nodes in work queue if
// their counter number matches their NumSchedPreds.
if (++succ_counter == m_wpo.get_num_preds(succ_idx)) {
worker_state->push_task(succ_idx);
}
}
return nullptr;
}
// Exit node
// Check if component of the exit node has stabilized.
auto head_idx = m_wpo.get_head_of_exit(wpo_idx);
NodeId head = m_wpo.get_node(head_idx);
Domain* current_state = &this->m_entry_states[head];
Domain new_state = Domain::bottom();
this->compute_entry_state(&context, head, &new_state);
if (new_state.leq(*current_state)) {
// Component stabilized.
context.reset_local_iteration_count_for(head);
*current_state = std::move(new_state);
for (auto succ_idx : m_wpo.get_successors(wpo_idx)) {
std::atomic<uint32_t>& succ_counter = wpo_counter[succ_idx];
// Increase succ node's counter, push succ nodes in work queue if
// their counter number matches their NumSchedPreds.
if (++succ_counter == m_wpo.get_num_preds(succ_idx)) {
worker_state->push_task(succ_idx);
}
}
} else {
// Component didn't stabilize.
this->extrapolate(context, head, current_state, new_state);
context.increase_iteration_count_for(head);
// Set component nodes v's counter to their
// NumOuterSchedPreds(v, wpo_idx)
for (auto pred_pair : m_wpo.get_num_outer_preds(wpo_idx)) {
auto component_idx = pred_pair.first;
assert(component_idx != entry_idx);
std::atomic<uint32_t>& component_counter =
wpo_counter[component_idx];
// Push component nodes in work queue if their counter number
// matches their NumSchedPreds.
// Note: On page 10, https://dl.acm.org/ft_gateway.cfm?id=3371082
// suggests to set the counter to be *equal* to the number of
// predecessors not in our component. However, that is only
// correct when all counter updates of a scheduling step are done
// together as a single atomic update. Instead, we choose to
// update point-wise, in which case we have to *add* the number of
// predecessors, and update our own counter to 0 before updating
// any other dependent counters.
if ((component_counter += pred_pair.second) ==
m_wpo.get_num_preds(component_idx)) {
worker_state->push_task(component_idx);
}
}
if (head_idx == entry_idx) {
// Handle special case when there is a loop on entry node.
// Because entry node have num_preds = 0, and for
// get_num_outer_preds the nodes with num_outer_preds are ignored.
// So we need to manually add entry node back to work queue if
// the component didn't stabilize.
worker_state->push_task(head_idx);
}
}
return nullptr;
},
m_num_thread,
/*push_tasks_while_running=*/true);
wq.add_item(m_wpo.get_entry());
wq.run_all();
for (uint32_t idx = 0; idx < m_wpo.size(); ++idx) {
assert(wpo_counter[idx] == 0);
}
}