in src/runtime/model.cc [1567:1819]
void FFModel::compile(LossType loss_type,
const std::vector<MetricsType>& metrics,
CompMode comp_mode)
{
if(metrics_input == -1) metrics_input = layers.size()-1;
Context ctx = config.lg_ctx;
Runtime* runtime = config.lg_hlr;
config.computationMode = comp_mode;
if (config.import_strategy_file.length() > 0) {
load_strategies_from_file(config.import_strategy_file, config.strategies);
}
if (config.search_budget > 0) {
// Launch the search task
FFModel* model = this;
TaskLauncher launcher(STRATEGY_SEARCH_TASK_ID,
TaskArgument(&model, sizeof(FFModel*)));
Future future = runtime->execute_task(ctx, launcher);
future.get_void_result();
} else {
// Do nothing
}
bool repl_labels = (layers[layers.size()-1]->op_type == OP_AGG_SPEC);
loss_op = new Loss(loss_type, repl_labels);
metrics_op = new Metrics(loss_type, metrics);
// Init performance metrics
TaskLauncher launcher(UPDATE_METRICS_TASK_ID, TaskArgument(metrics_op, sizeof(Metrics)));
current_metrics = runtime->execute_task(ctx, launcher);
// Perform inplace optimizations
for (size_t l = 1; l < layers.size(); l++) {
if (layers[l]->can_inplace_output()) {
// Assume outputs[0] is inplace with inputs[0]
assert(layers[l]->numOutputs == 1);
if (layers[l]->inputs[0].owner_op != NULL) {
int dim1 = layers[l]->outputs[0].numDim;
int dim2 = layers[l]->inputs[0].numDim;
ParallelConfig pc1, pc2;
assert(config.find_parallel_config(dim1, layers[l]->name, pc1));
assert(config.find_parallel_config(dim2, layers[l]->inputs[0].owner_op->name, pc2));
if (pc1 == pc2) {
// Check no others also need layers[l]->inputs[0]
bool found = false;
for (size_t i = 0; i < layers.size(); i++) {
if (i == l) continue;
for (int j = 0; j < layers[i]->numInputs; j++) {
if ((layers[i]->inputs[j].owner_op == layers[l]->inputs[0].owner_op)
&&(layers[i]->inputs[j].owner_idx == layers[l]->inputs[0].owner_idx)) {
found = true;
}
}
}
if (!found) {
// Perform inplace
layers[l]->do_inplace_output();
}
}
}
}
}
for (size_t l = 0; l < layers.size(); l++) {
Op* op = layers[l];
for (int i = 0; i < op->numInputs; i++) {
if (op->inputs[i].owner_op == NULL) {
// User created tensor
op->inputs[i] = op->inputs[i];
} else {
// Refresh op's input tensor
int tsIdx = op->inputs[i].owner_idx;
op->inputs[i] = op->inputs[i].owner_op->outputs[tsIdx];
}
}
op->create_output_and_partition(*this);
op->create_weights(*this);
for (int i = 0; i < op->numWeights; i++) {
parameters.push_back(op->weights[i]);
}
}
// Check correctness
for (size_t l = 0; l < layers.size(); l++) {
Op* op = layers[l];
for (int i = 0; i < op->numOutputs; i++) {
assert(op->outputs[i].owner_op == op);
assert(op->outputs[i].owner_idx == i);
}
}
// Perform fusion optimizations
if (config.perform_fusion) {
fprintf(stderr, "Applying fusion optimizations during compilation...\n");
fprintf(stderr, "%zu layers before fusion...\n", layers.size());
std::vector<Op*> new_layers;
std::vector<Op*> old_layers = layers;
while (apply_fusion(layers, new_layers)) {
for (size_t i = 0; i < new_layers.size(); i++)
for (int idx = 0; idx < new_layers[i]->numInputs; idx++)
for (size_t j = i+1; j < new_layers.size(); j++)
if (new_layers[i]->inputs[idx].owner_op == new_layers[j])
assert(false);
layers = new_layers;
}
// Check integrity
for (size_t l = 0; l < layers.size(); l++) {
if (layers[l]->op_type == OP_FUSED) {
FusedOp* fused = (FusedOp*) layers[l];
int ioff = 0, woff = 0, ooff = 0;
for (int op = 0; op < fused->numOperators; op++) {
Op* old_op = fused->operators[op];
for (int i = 0; i < fused->op_num_inputs[op]; i++) {
int my_off = fused->op_input_idx[i+ioff];
if (fused->op_input_source[i+ioff] == FusedOp::SOURCE_INPUT) {
assert(fused->inputs[my_off].region == old_op->inputs[i].region);
} else if (fused->op_input_source[i+ioff] == FusedOp::SOURCE_OUTPUT) {
assert(fused->outputs[my_off].region == old_op->inputs[i].region);
} else
assert(false);
}
for (int i = 0; i < fused->op_num_weights[op]; i++) {
int my_off = fused->op_weight_idx[i+woff];
assert(fused->op_weight_source[i+woff] == FusedOp::SOURCE_WEIGHT);
assert(fused->weights[my_off].region == old_op->weights[i].region);
}
for (int i = 0; i < fused->op_num_outputs[op]; i++) {
int my_off = fused->op_output_idx[i+ooff];
assert(fused->op_output_source[i+ooff] == FusedOp::SOURCE_OUTPUT);
assert(fused->outputs[my_off].region == old_op->outputs[i].region);
}
ioff += fused->op_num_inputs[op];
woff += fused->op_num_weights[op];
ooff += fused->op_num_outputs[op];
}
} else {
bool found = false;
for (size_t i = 0; i < old_layers.size(); i++) {
if (old_layers[i] == layers[l]) {
assert(!found);
found = true;
}
}
assert(found);
}
}
fprintf(stderr, "%zu layers after fusion...\n", layers.size());
for (size_t i = 0; i < layers.size(); i++) {
Op* op = layers[i];
printf("layer[%zu]: type(%d)\n", i, layers[i]->op_type);
for (int j = 0; j < op->numInputs; j++) {
LogicalRegion handle = op->inputs[j].region;
printf("inputs[%d] region(%d,%d,%d)\n", j, handle.get_index_space().get_id(),
handle.get_field_space().get_id(),
handle.get_tree_id());
}
for (int j = 0; j < op->numOutputs; j++) {
LogicalRegion handle = op->outputs[j].region;
printf("outputs[%d] region(%d,%d,%d)\n", j, handle.get_index_space().get_id(),
handle.get_field_space().get_id(),
handle.get_tree_id());
}
for (int j = 0; j < op->numWeights; j++) {
LogicalRegion handle = op->weights[j].region;
printf("weights[%d] region(%d,%d,%d)\n", j, handle.get_index_space().get_id(),
handle.get_field_space().get_id(),
handle.get_tree_id());
}
}
}
Op* final_layer = layers[layers.size()-1];
// FIXME: currently assume the final layer has exactly one output
assert(final_layer->numOutputs == 1);
//assert(final_layer->outputs[0].numDim == 2);
int dims[MAX_TENSOR_DIM], num_dims;
num_dims = final_layer->outputs[0].numDim;
// Note that FlexFlow's runtim internally reverse the array ordering
Op* first_layer = layers[0];
int input_dims = first_layer->inputs[0].numDim;
// FIXME: Currently assume 1st input for 1st layer = batch_size
int batch_size = first_layer->inputs[0].adim[input_dims-1];
dims[0] = batch_size;
for (int i = 1; i < num_dims; i++)
dims[i] = final_layer->outputs[0].adim[num_dims-1-i];
DataType label_type = DT_FLOAT;
if (loss_type == LOSS_SPARSE_CATEGORICAL_CROSSENTROPY) {
// assign dims[num_dims-1] = 1 for sparse categorical labels
dims[num_dims-1] = 1;
label_type = DT_INT32;
}
// create label tensor
switch (num_dims) {
#define DIMFUNC(DIM) \
case DIM: \
{ \
label_tensor = create_tensor<DIM>(dims, label_type); \
label_tensor_with_final_part = label_tensor; \
IndexSpaceT<DIM> task_is = IndexSpaceT<DIM>(\
get_or_create_task_is(DIM, final_layer->name));\
create_disjoint_partition<DIM>(label_tensor_with_final_part,\
task_is, label_tensor_with_final_part.part,\
label_tensor_with_final_part.part_grad);\
break; \
}
LEGION_FOREACH_N(DIMFUNC)
#undef DIMFUNC
default:
{
assert(false && "Unsupported dim");
}
}
// init optimizer
assert(optimizer != NULL);
optimizer->init();
#ifdef FF_USE_NCCL
if (config.computationMode == COMP_MODE_TRAINING) {
// init all nccl communicators
std::map<MappingTagID, ParallelConfig>::iterator iter;
for (iter = config.strategies.begin(); iter != config.strategies.end(); iter++) {
// only init nccl for GPU parallel configurations
if (iter->second.device_type != ParallelConfig::GPU) continue;
std::map<MappingTagID, ParallelConfig>::const_iterator it2;
bool found = false;
// Reuse nccl comms for same parallel config
for (it2 = config.strategies.begin(); it2 != iter; it2++) {
if (it2->second == iter->second) {
found = true;
for (int i = 0; i < it2->second.num_parts(); i++)
iter->second.nccl_comms[i] = it2->second.nccl_comms[i];
}
}
// Create new nccl comms
if (!found) {
TaskLauncher launcher(NCCL_GETUNIQUEID_TASK_ID, TaskArgument(NULL, 0));
Future future = runtime->execute_task(ctx, launcher);
ncclUniqueId ncclId = future.get_result<ncclUniqueId>();
IndexSpace task_is = get_or_create_task_is(iter->second);
ArgumentMap argmap;
IndexLauncher index_launcher(NCCL_INIT_COMMS_TASK_ID, task_is,
TaskArgument(&ncclId, sizeof(ncclUniqueId)), argmap,
Predicate::TRUE_PRED, false/*must*/, 0/*mapper_id*/,
iter->first/*MappingTagID*/);
FutureMap fm = runtime->execute_index_space(ctx, index_launcher);
fm.wait_all_results();
int idx = 0;
Domain task_domain = runtime->get_index_space_domain(ctx, task_is);
for (Domain::DomainPointIterator it(task_domain); it; it++, idx++) {
iter->second.nccl_comms[idx] = fm.get_result<ncclComm_t>(*it);
}
}
}
}
#endif
}