void brute_force_all_pairs_similarity_with_vector_reference()

in src/toolkits/sparse_similarity/neighbor_search.hpp [64:426]


void brute_force_all_pairs_similarity_with_vector_reference(
    std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > > reference_data,
    const std::vector<item_processing_info<SimilarityType> >& reference_item_info,
    std::shared_ptr<sarray<std::vector<std::pair<size_t, double> > > > query_data,
    const std::vector<item_processing_info<SimilarityType> >& query_item_info,
    const SimilarityType& similarity,
    ProcessFunction&& process_function,
    size_t num_dimensions,
    size_t max_memory_usage,
    SkipFunction&& skip_pair,
    const dense_bitset* query_mask = nullptr) {

  // The vertex type is used as reference later on.
  typedef typename SimilarityType::item_data_type item_data_type;
  typedef typename SimilarityType::interaction_data_type interaction_data_type;
  typedef typename SimilarityType::final_item_data_type final_item_data_type;
  typedef typename SimilarityType::final_interaction_data_type final_interaction_data_type;

  // Set constants used later.
  static constexpr bool use_final_item_data = (
      !std::is_same<unused_value_type, final_item_data_type>::value);

  static constexpr bool missing_values_are_zero = SimilarityType::missing_values_are_zero();

  final_item_data_type _unused;

  // constants used later
  size_t max_num_threads = thread::cpu_count();
  size_t num_query_rows = query_data->size();
  size_t num_reference_rows = reference_data->size();
  bool using_mask = (query_mask != nullptr);

  ////////////////////////////////////////////////////////////////////////////////
  // Check input

  DASSERT_NE(num_dimensions, 0);

  DASSERT_EQ(reference_item_info.size(), reference_data->size());
  DASSERT_EQ(query_item_info.size(), query_data->size());

  if(using_mask) {
    DASSERT_EQ(query_mask->size(), query_data->size());
  }

  ////////////////////////////////////////////////////////////////////////////////
  // If we are using a mask, then the number of query rows is
  // calculated from that.
  if(using_mask) {
    num_query_rows = query_mask->popcount();
  }

  // Nothing to do here.
  if(num_query_rows == 0)
    return;

  // Set the block size as a function of the maximum memory usage.
  size_t max_query_rows_per_block = max_memory_usage / (num_dimensions * sizeof(double));
  max_query_rows_per_block = std::max(max_num_threads, max_query_rows_per_block);
  max_query_rows_per_block = std::min(num_query_rows, max_query_rows_per_block);

  // This is needed, as we use an int counter in the inner loop to
  // enable easier autovectorization (see
  // http://www.slideshare.net/linaroorg/using-gcc-autovectorizer).
  max_query_rows_per_block = std::min<size_t>(
      std::numeric_limits<int>::max() / 2, max_query_rows_per_block);

  // Set up the number of blocks.  Number of blocks is the ceiling of
  // all this.
  size_t num_blocks = (num_query_rows + (max_query_rows_per_block - 1)) / max_query_rows_per_block;

  // Now that we have the number of blocks, further minimize memory
  // use by making the number of query rows per block as even as
  // possible.  That way we won't end up with a single block that has
  // like 1 row and the rest that have many more.

  max_query_rows_per_block = (num_query_rows + (num_blocks - 1)) / num_blocks;

  // Get the reader for the query data.
  auto query_reader = query_data->get_reader(max_num_threads);

  // Get the reader for the reference data
  auto reference_reader = reference_data->get_reader(max_num_threads);

  // Set up the query data so that all dimensions are contiguous in
  // memory.  That way, on a query, we can do everything for this
  // element together, thereby optimizing memory access and the
  // increasing the likelihood that the compiler can vectorize it.
  std::vector<double> block_data(max_query_rows_per_block * num_dimensions);
  auto block_data_index = [&](size_t row_idx, size_t element_idx) {
    DASSERT_LT(row_idx, max_query_rows_per_block);
    DASSERT_LT(element_idx, num_dimensions);
    return element_idx * max_query_rows_per_block + row_idx;
  };

  // For all the rows in the current block, this is the actual row
  // index within that block.
  std::vector<size_t> block_query_row_indices(max_query_rows_per_block);

  // The vertex info for each of these rows.
  std::vector<item_data_type> block_item_data(max_query_rows_per_block);

  std::vector<final_item_data_type> block_final_item_data;
  if(use_final_item_data) {
    block_final_item_data.resize(max_query_rows_per_block);
  }

  // Counters indicating where we are within each segment, as each
  // thread reads a new segment.
  std::vector<size_t> query_row_counters(max_num_threads, size_t(-1));

  // Loop over the blocks.
  for(size_t block_idx = 0; block_idx < num_blocks; ++block_idx) {

    // This is the location of the current open slot for dumping one of the rows
    atomic<size_t> block_write_idx = 0;

    // Clear out the data in this block.
    std::fill(block_data.begin(), block_data.end(), missing_values_are_zero ? 0 : NAN);

    // Fill the block with appropriate rows.
    in_parallel([&](size_t thread_idx, size_t num_threads) GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {

        // This is the segment we are responsible for in this thread.
        size_t query_row_idx_start = (query_data->size() * thread_idx) / num_threads;
        size_t query_row_idx_end = (query_data->size() * (thread_idx+1)) / num_threads;

        // Get the overall current_query_row_index where we are at within the segment
        // this thread is assigned to.
        size_t& current_query_row_index = query_row_counters[thread_idx];

        // Check for initializing it at the appropriate location.
        if(current_query_row_index == size_t(-1)) {
          // It has not been initialized yet; do it at the start of our segment.
          current_query_row_index = query_row_idx_start;
        }

        // Row buffer.
        std::vector<std::vector<std::pair<size_t, double> > > row_v(1);

        // Now, read in rows until we are out of space in this block,
        // or until we are out of rows in this reading segment.
        while(current_query_row_index < query_row_idx_end) {

          // If we are using the query mask, then check if we are in a
          // valid spot.  If not, then advance forward until we are.
          if(using_mask && !query_mask->get(current_query_row_index)) {
            size_t new_q_idx = current_query_row_index;
            bool any_more = query_mask->next_bit(new_q_idx);

            if(UNLIKELY(!any_more || new_q_idx >= query_row_idx_end)) {
              // Done.
              current_query_row_index = query_row_idx_end;
              break;
            } else {
              DASSERT_NE(current_query_row_index, new_q_idx);

              // Next row.
              current_query_row_index = new_q_idx;
            }
          }

          if(using_mask) {
            // Just make sure we've got a live one.
            DASSERT_TRUE(query_mask->get(current_query_row_index));
          }

          // Get the next index.
          size_t internal_block_idx = (++block_write_idx) - 1;

          // Do we have a place in the to put this?  If not, break and
          // leave this position for the next block.
          if(internal_block_idx >= max_query_rows_per_block) {
            break;
          }

          // Assert that we do indeed have a row left.
          DASSERT_LT(current_query_row_index, query_row_idx_end);

          // Now that we know we have a spot in the block, write it
          // out to the block data.
          query_reader->read_rows(current_query_row_index, current_query_row_index + 1, row_v);
          const auto& row = row_v[0];

          // Write
          block_query_row_indices[internal_block_idx] = current_query_row_index;
          block_item_data[internal_block_idx] = query_item_info[current_query_row_index].item_data;

          // Write out the final vertex data.
          if(use_final_item_data) {
            block_final_item_data[internal_block_idx]
                = query_item_info[current_query_row_index].final_item_data;
          }

          // Write the row out to the block data.
          for(size_t i = 0; i < row.size(); ++i) {
            size_t idx = block_data_index(internal_block_idx, row[i].first);
            block_data[idx] = row[i].second;
          }

          // Finally, advance the counter to continue.
          ++current_query_row_index;
        }

        // If we are on the last pass, make sure that we have
        // covered all the query data.
        if(block_idx == num_blocks - 1) {
          DASSERT_EQ(current_query_row_index, query_row_idx_end);
        }
      });

    // Check to make sure our math is correct regarding the number of query
    // rows and the number of blocks.
#ifndef NDEBUG
    {
      size_t _block_write_idx = block_write_idx; // cause of atomic
      if(block_idx < num_blocks - 1) {
        DASSERT_GE(_block_write_idx, max_query_rows_per_block);
      } else {
        DASSERT_LE(_block_write_idx, max_query_rows_per_block);
      }

      // Readjust the size of the block (num_query_rows_in_block) if needed.
      if(block_write_idx < max_query_rows_per_block) {
        // Everything is done, so it must be in the last block
        DASSERT_EQ(block_idx, num_blocks - 1);
      }
    }
#endif

    // Set the number of query rows in this block.  The
    // block_write_idx may have been incremented multiple times by
    // different threads.
    size_t num_query_rows_in_block = std::min<size_t>(block_write_idx, max_query_rows_per_block);

      // If all the math is correct, this block will never be empty.
    DASSERT_GT(num_query_rows_in_block, 0);

    // Now, if we're using a mask, make sure all the indices are
    // masked properly.
#ifndef NDEBUG
    {
      if(using_mask) {
        for(size_t i = 0; i < num_query_rows_in_block; ++i) {
          DASSERT_TRUE(query_mask->get(block_query_row_indices[i]));
        }
      }
    }
#endif

    // Okay, now that we have a specific block of query data, go
    // through and perform the nearest neighbors query on it.
    in_parallel([&](size_t thread_idx, size_t num_threads) GL_GCC_ONLY(GL_HOT_NOINLINE_FLATTEN) {
        // This is the segment we are responsible for in this thread.
        size_t reference_row_idx_start = (num_reference_rows * thread_idx) / num_threads;
        size_t reference_row_idx_end = (num_reference_rows * (thread_idx+1)) / num_threads;

        const size_t n_reference_rows_per_block = 16;
        std::vector<std::vector<std::pair<size_t, double> > > reference_rows_v;

        std::vector<interaction_data_type> edges(num_query_rows_in_block);

        // Read it in in blocks of n_reference_rows_per_block rows for efficiency.
        for(size_t outer_idx = reference_row_idx_start;
            outer_idx < reference_row_idx_end;
            outer_idx += n_reference_rows_per_block) {

          ////////////////////////////////////////////////////////////////////////////////

          reference_reader->read_rows(
              outer_idx,
              std::min(outer_idx + n_reference_rows_per_block, reference_row_idx_end),
              reference_rows_v);

          if(reference_rows_v.size() != n_reference_rows_per_block) {
            DASSERT_EQ(outer_idx + reference_rows_v.size(), reference_row_idx_end);
          }

          // Now over rows in the buffer.
          for(size_t inner_idx = 0; inner_idx < reference_rows_v.size(); ++inner_idx) {

            // Now, for each row, go through and calculate the full intersection.
            const size_t ref_idx = outer_idx + inner_idx;
            const auto& row = reference_rows_v[inner_idx];

            // Get the information for this particular vertex.
            item_data_type ref_item_data = reference_item_info[ref_idx].item_data;

            const final_item_data_type& ref_final_item_data
                = reference_item_info[ref_idx].final_item_data;

            // Zero the edges.
            edges.assign(num_query_rows_in_block, interaction_data_type());

            // Get the vertex for this one here.
            for(const auto& p : row) {
              size_t dim_index = p.first;
              double ref_value = p.second;

              if(missing_values_are_zero) {
                // This is in the inner loop, so a lot of time is spent
                // in this computation.  Try to make it as friendly as
                // possible to the vectorizer as possible.

                double* __restrict__ bd_ptr = &(block_data[block_data_index(0, dim_index)]);
                item_data_type* __restrict__ it_data_ptr = block_item_data.data();
                interaction_data_type* __restrict__ int_data_ptr = edges.data();

                for(int i = 0; i < int(num_query_rows_in_block);
                    ++i, ++bd_ptr, ++it_data_ptr, ++int_data_ptr) {

                  similarity.update_interaction_unsafe(
                      *int_data_ptr,
                      ref_item_data, *it_data_ptr,
                      ref_value, *bd_ptr);
                }
              } else {
                for(size_t i = 0; i < num_query_rows_in_block; ++i) {
                  // branching on individual entries, so can't do
                  // vectorization here anyway.
                  double block_data_entry = block_data[block_data_index(i, dim_index)];

                  if(std::isnan(block_data_entry))
                    continue;

                  // Aggregate it along this edge.
                  similarity.update_interaction_unsafe(
                      edges[i],
                      ref_item_data, block_item_data[i],
                      ref_value, block_data_entry);
                }
              }
            }

            // Now, go through, finalize the answers, and record them.
            for(size_t i = 0; i < num_query_rows_in_block; ++i) {
              size_t query_index = block_query_row_indices[i];

              if(skip_pair(query_index, ref_idx))
                continue;

              // Get the vertex and value info for this query row.
              const auto& q_item_data = block_item_data[i];
              const auto& q_final_item_data =
                  (use_final_item_data ? block_final_item_data[i] : _unused);

              // Set up the output value.
              final_interaction_data_type e_out = final_interaction_data_type();

              similarity.finalize_interaction(e_out,
                                              ref_final_item_data, q_final_item_data,
                                              edges[i],
                                              ref_item_data, q_item_data);

              // Now do the meat of the operation -- record the result.
              process_function(ref_idx, query_index, e_out);
            }
          }
        }
      });

    // Now, we're done, so go to the next block.
  }
}