void run()

in src/toolkits/graph_analytics/label_propagation.cpp [77:347]


  void run(sgraph& g, size_t& num_iter, double& average_l2_delta) {

    /// Type defs
    typedef std::vector<std::vector<flexible_type>> flex_column_type;

    /**
     * Graph related constant data
     */
    size_t num_classes = 0;
    size_t num_partitions = g.get_num_partitions();

    // Vertex labels from input data
    flex_column_type vertex_labels = g.fetch_vertex_data_field_in_memory(label_field);

    std::atomic<size_t> num_labeled_vertices(0);
    size_t num_unlabeled_vertices = 0;

    /// Initialization and check vertex labels
    /// Type check:
    /// "label_field" must be integer and starts from 0
    try {
      size_t min_class = (size_t)(-1);
      size_t max_class = 0;
      typedef std::pair<size_t, size_t> min_max_pair;

      auto min_max_reducer = [&](const flexible_type& v, min_max_pair& reducer) {
        if (!v.is_na()) {
          num_labeled_vertices++;
          reducer.first = std::min<size_t>((size_t)(v), reducer.first);
          reducer.second = std::max<size_t>((size_t)(v), reducer.second);
        }
        return true;
      };
      for (size_t i = 0; i < num_partitions; ++i) {
        auto sa = g.vertex_partition(i).select_column(label_field);
        auto partial_reduce = turi::reduce<min_max_pair>(*sa, min_max_reducer,
                                                             min_max_pair{(size_t)(-1), 0});
        for (const auto& result: partial_reduce) {
          min_class = std::min<size_t>(min_class, result.first);
          max_class = std::max<size_t>(max_class, result.second);
        }
      }
      ASSERT_EQ(min_class, 0);
      num_classes = max_class + 1;
    } catch (...) {
      log_and_throw("class label must be [0, num_classes)");
    }
    logprogress_stream << "Num classes: " << num_classes << std::endl;
    if (num_classes > MAX_CLASSES)  {
      log_and_throw("Too many classes provided. Label propagation works with maximal 1000 classes.");
    }

    num_unlabeled_vertices = g.num_vertices() - num_labeled_vertices;
    logprogress_stream << "#labeled_vertices: " << (size_t)num_labeled_vertices
                       << "\t#unlabeled_vertices: " << num_unlabeled_vertices
                       << std::endl;
    if (num_unlabeled_vertices == 0)
      logprogress_stream << "Warning: all vertices are already labeled" << std::endl;


    std::vector<size_t> size_of_partition;
    for (size_t i = 0; i < num_partitions; ++i) {
      size_of_partition.push_back(g.vertex_partition(i).size());
    }

    /**
     * In memory vertex data storing the probability of each class for each vertex.
     *
     * current_label_pb[i] is a dense eigen matrix (row major ordering),
     * storing the probability for vertices in partition i.
     *
     * prev_label_pb is a store the copy of the previous iteration.
     */
    typedef Eigen::Matrix<FLOAT_TYPE, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor> matrix_type;
    std::unique_ptr<std::vector<matrix_type>> current_label_pb(
        new std::vector<matrix_type>(num_partitions));
    std::unique_ptr<std::vector<matrix_type>> prev_label_pb(
        new std::vector<matrix_type>(num_partitions));
    std::vector<std::vector<mutex>> vertex_locks(num_partitions);

    // Initialize class probability to zero
    for (size_t i = 0; i < num_partitions; ++i) {
      (*current_label_pb)[i] = matrix_type::Zero(size_of_partition[i], num_classes);
      (*prev_label_pb)[i] = matrix_type::Zero(size_of_partition[i], num_classes);
      vertex_locks[i].resize(size_of_partition[i]);
    }

    // Set the initial label probability
    // For labeled vertices p[k] = 1.0 if k is the given label
    // For unlabeled vertices p[k] = 1/num_classes for all k

    static double BASELINE_PROB = 1.0 / num_classes;
    for (size_t i = 0; i < num_partitions; ++i) {
      size_t num_vertices_in_partition = vertex_labels[i].size();
      parallel_for (0, num_vertices_in_partition, [&](size_t j) {
        if (!vertex_labels[i][j].is_na()) {
          size_t class_label = vertex_labels[i][j];
          if (class_label >= num_classes) {
            log_and_throw("Class label must be [0, num_classes)");
          }
          (*prev_label_pb)[i](j, class_label) = 1.0;
        } else {
          // uniform
          (*prev_label_pb)[i].row(j).setConstant(BASELINE_PROB);
        }
      });
    }

    bool use_edge_weight = !weight_field.empty();

    /**
     * Define one iteration of label propagation in triple apply:
     *
     * \code
     * foreach (src, edge, dst):
     *   current_label_pb[:][dst.addr] += prev_label_pb[:][src.addr] * edge.weight
     * \endcode
     */
    sgraph_compute::fast_triple_apply_fn_type apply_fn =
         [&](sgraph_compute::fast_edge_scope& scope) {
           const auto source_addr = scope.source_vertex_address();
           const auto target_addr = scope.target_vertex_address();

           FLOAT_TYPE weight = use_edge_weight ? (flex_float)scope.edge()[2] : 1.0;

           {
             auto delta = (*prev_label_pb)[source_addr.partition_id].row(source_addr.local_id) * weight;
             auto& mtx = vertex_locks[target_addr.partition_id][target_addr.local_id];
             std::lock_guard<mutex> lock(mtx);
             (*current_label_pb)[target_addr.partition_id].row(target_addr.local_id) += delta;
           }

           // Propagate from target to source
           if (undirected) {
             auto delta = (*prev_label_pb)[target_addr.partition_id].row(target_addr.local_id) * weight;
             auto& mtx = vertex_locks[source_addr.partition_id][source_addr.local_id];
             std::lock_guard<mutex> lock(mtx);
             (*current_label_pb)[source_addr.partition_id].row(source_addr.local_id) += delta;
           }
         };

    // Done with all initializations, this is the real for loop
    table_printer table({{"Iteration", 0}, {"Average l2 change in class probability", 0}});
    table.print_header();
    size_t iter = 0;
    while (true) {
      if (max_iterations > 0 && (int)iter >= max_iterations)
        break;

      ++iter;
      if(cppipc::must_cancel()) {
        log_and_throw(std::string("Toolkit cancelled by user."));
      }

      // Stores the total l2 diff in label probability
      turi::atomic<FLOAT_TYPE> total_l2_diff = 0.0;

      // initialize with the self weight of the the previous label value
      for (size_t i = 0; i < num_partitions; ++i) {
        (*current_label_pb)[i] = (*prev_label_pb)[i] * self_weight;
      }

      // Label Propagation
      if (weight_field.empty()) {
        sgraph_compute::fast_triple_apply(g, apply_fn, {}, {});
      } else {
        sgraph_compute::fast_triple_apply(g, apply_fn, {weight_field}, {});
      }

      // Post processing:
      // 1. Normalize to probability
      // 2. Clamp labeled data
      for (size_t i = 0; i < num_partitions; ++i) {
        size_t num_vertices_in_partition = vertex_labels[i].size();
        parallel_for (0, num_vertices_in_partition, [&](size_t j) {
          if (!vertex_labels[i][j].is_na()) {
            (*current_label_pb)[i].row(j).setZero();
            size_t class_label = vertex_labels[i][j];
            (*current_label_pb)[i](j, class_label) = 1.0;
          } else {
            (*current_label_pb)[i].row(j) /= (*current_label_pb)[i].row(j).sum();
          }
        });
        auto diff = (((*current_label_pb)[i] - (*prev_label_pb)[i]).rowwise().norm().sum());
        total_l2_diff += diff;
      }

      // swap the current label and the prev label
      std::swap(current_label_pb, prev_label_pb);

      // store iteration and delta
      num_iter = iter;
      average_l2_delta =
          num_unlabeled_vertices > 0 ? total_l2_diff / num_unlabeled_vertices : 0.0;

      table.print_row(iter, average_l2_delta);

      if (average_l2_delta < threshold)
        break;
    } // end of label_propagation iterations
    table.print_footer();

    // Free some memory
    (*current_label_pb).clear();

    // Compute the predicted label by taking the argmax of each probabilty vector
    static const double EPSILON = 1e-10;
    flex_column_type predicted_labels = sgraph_compute::create_vertex_data_from_const<flexible_type>(g, 0);
    for (size_t i = 0; i < num_partitions; ++i) {
      size_t num_vertices_in_partition = vertex_labels[i].size();
      std::vector<flexible_type>& preds = predicted_labels[i];
      matrix_type& mat = (*prev_label_pb)[i];
      parallel_for(0, num_vertices_in_partition, [&](size_t rowid) {
        // Get the index of the largest value in the row, and store it in preds[rowid]
        int best_class_id = -1;
        mat.row(rowid).maxCoeff(&best_class_id);
        // If we still get uniform distribution, output NONE
        if (fabs(mat(rowid, best_class_id) - BASELINE_PROB) < EPSILON) {
          preds[rowid] = FLEX_UNDEFINED;
        } else {
          preds[rowid] = best_class_id;
        }
      });
    }
    g.add_vertex_field(predicted_labels, PREDICTED_LABEL_COLUMN_NAME, flex_type_enum::INTEGER);

    // Write the probability vector back to graph vertex data
    typedef Eigen::Map<Eigen::Matrix<double, 1, Eigen::Dynamic, Eigen::RowMajor>>
        vector_buffer_type;

    // output_columns[class_index][partition_index]
    std::vector<std::vector<std::shared_ptr<sarray<flexible_type>>>> output_columns(num_classes);
    for (auto& c : output_columns) c.resize(num_partitions);

    parallel_for(0, num_partitions, [&](size_t i) {
      size_t num_rows = (*prev_label_pb)[i].rows();

      std::vector<sarray<flexible_type>::iterator>
        out_iterators(num_classes);

      // prepare sarray and writer
      for (size_t k = 0; k < num_classes; ++k) {
        auto sa = std::make_shared<sarray<flexible_type>>();
        sa->open_for_write(1);
        sa->set_type(flex_type_enum::FLOAT);
        out_iterators[k] = sa->get_output_iterator(0);
        output_columns[k][i] = sa;
      }

      // write to sarray
      flex_vec raw_buffer(num_classes);
      vector_buffer_type mapped_buffer(&(raw_buffer[0]),
                                       num_classes);
      for (size_t j = 0; j < num_rows; ++j) {
        mapped_buffer = ((*prev_label_pb)[i].row(j)).template cast<double>();
        for (size_t k = 0; k < num_classes; ++k) {
          *(out_iterators[k])++ = raw_buffer[k];
        }
      }

      // close sarray
      for (size_t k = 0; k < num_classes; ++k)
        output_columns[k][i]->close();
    });

    for (size_t k = 0; k < num_classes; ++k) {
      std::string column_name = LABEL_COLUMN_PREFIX + std::to_string(k);
      g.add_vertex_field(output_columns[k], column_name);
    }
    // Done
  }