Error HostManager::addNetwork()

in lib/Runtime/HostManager/HostManager.cpp [251:616]


Error HostManager::addNetwork(std::unique_ptr<Module> module,
                              CompilationContext &cctx) {
#ifdef FACEBOOK_INTERNAL
  LOG(INFO) << "Adding Glow network built with revision hash: " << revisionHash;
#endif /* FACEBOOK_INTERNAL */
  VLOG(1) << "addNetwork";
  ScopeGuard debugDumpDAGGuard([&]() {
    if (cctx.dumpFinalGraph) {
      for (Function *F : module->getFunctions()) {
        auto fname = strFormat("%sfinal_graph_dbg_err_%s.dot",
                               cctx.dumpGraphPath.c_str(), F->getName().data());
        LOG(INFO) << "Dumping final graph due to error to " << fname;
        F->dumpDAG(fname);
      }
    }
  });

  /// If specified in the cctx, this will prevent Constants from being modified
  /// until the current scope ends or the preventer is dismissed. Does so by
  /// swapping in temporary Placeholders instead of Constants.
  ConstantModificationPreventer constModPreventer(*module, cctx);
  if (cctx.optimizationOpts.delayAndRecordConstantModification) {
    constModPreventer.activate();
  }

  std::vector<std::string> names;
  {
    std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
    auto functions = module->getFunctions();
    for (auto &F : functions) {
      std::string name = F->getName().str();
      auto it = networks_.find(name);
      if (it != networks_.end() ||
          processingNetworks_.find(name) != processingNetworks_.end()) {
        cleanupAddNetwork(names);
        return MAKE_ERR(
            ErrorValue::ErrorCode::RUNTIME_ERROR,
            "Failed to add network: already have a function called " + name);
      }
      // Add the network to processingNetworks_ so we know it's being worked on.
      processingNetworks_.insert(name);
      names.push_back(name);
    }
  }

  // Issue a warning when loading backend specific options from the command line
  // and the compile context also contains backend specific options.
  if (!loadBackendSpecificOptionsOpt.empty()) {
    if (cctx.backendOpts.backendSpecificOpts.size() != 0) {
      VLOG_EVERY_N(1, 1000) << "Warning: backendSpecificOpts is set via the "
                               "HostManager, ignoring previously set options.";
    }
    cctx.backendOpts.backendSpecificOpts =
        deserializeStrStrMapFromYaml(loadBackendSpecificOptionsOpt);
  } else {
    auto ctxLoadBackendSpecificOpt =
        cctx.backendOpts.backendSpecificOpts.find("loadBackendSpecificOptions");

    if (ctxLoadBackendSpecificOpt !=
        cctx.backendOpts.backendSpecificOpts.end()) {
      cctx.backendOpts.backendSpecificOpts =
          deserializeStrStrMapFromYaml(ctxLoadBackendSpecificOpt->second);
    }
  }

  std::vector<DeviceInfo> deviceInfo;
  {
    std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
    for (auto &device : availableDevices_) {
      DeviceInfo info = devices_[device]->getDeviceInfo();
      info.availableMemory = devices_[device]->getAvailableMemory();
      info.backendName = devices_[device]->getBackendName().str();
      info.nonSupportedNodes =
          devices_[device]->getParamByName("nonSupportedNodes").str();
      info.supportedNodes =
          devices_[device]->getParamByName("supportedNodes").str();
      // If p2p is enabled update the inputCount limit.
      if (cctx.enableP2P) {
        info.inputCountMax = P2PInputLimit;
      }
      deviceInfo.push_back(info);
    }
  }

  // Optimize Functions only if we don't have any backendSpecificNodeInfo,
  // because if we do then the Functions were already optimized and Nodes had
  // extra info mapped to them, so we don't want to mutate the Function. Also
  // skip optimizations if we're loading an AOT optimized model.
  const bool skipOptimizations =
      cctx.loadingAOTModel || !cctx.backendOpts.backendSpecificNodeInfo.empty();

  // Perform a round of target-independent graph optimizations. This helps the
  // partitioner to do its job more efficiently.
  if (!skipOptimizations) {
    for (Function *F : module->getFunctions()) {
      auto err = optimizeFunctionBeforeLowering(F, cctx);
      if (err) {
        {
          std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
          cleanupAddNetwork(names);
        }
        RETURN_ERR(err);
      }
    }
  }
  VLOG(1) << "Before partitioner";
  Partitioner partitioner(module.get(), deviceInfo, skipOptimizations);
  auto backendName = devices_[0]->getBackendName();
  const auto &backend = provisioner_->getBackend(backendName);
  auto contextCount = backend.getContextCount(cctx);
  partitioner.setContextCount(contextCount);
  DAGListTy nodeList;
  auto result = partitioner.partition(cctx);
  VLOG(1) << "After partitioner";
  if (result) {
    nodeList = std::move(result.get());
  } else {
    std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
    cleanupAddNetwork(names);
    RETURN_ERR(result.takeError());
  }
  VLOG(1) << "Before quantmode";
  if (cctx.precisionConfig.quantMode == QuantizationMode::Profile) {
    // Since for profiling the provisioner will be reset, we only allow one
    // network in one HM.
    if (networks_.size() > 0) {
      return MAKE_ERR(ErrorValue::ErrorCode::RUNTIME_ERROR,
                      "For quantization profiling flow, there can't be other "
                      "registered networks before this one");
    }
    // For profiling, we use CPU backend. Overwrite Provisioner and Executor
    // to force the network is compiled and run in profilingBackend. backend.
    size_t devicesNum = devices_.size();
    for (size_t i = 0; i < devicesNum; i++) {
      auto name = devices_[i]->getDeviceConfig().name;
      auto config = glow::make_unique<DeviceConfig>(profilingBackend, name);
      devices_[i] = std::unique_ptr<DeviceManager>(
          DeviceManager::createDeviceManager(*config));
      RETURN_IF_ERR(devices_[i]->init());
    }
    provisioner_.reset(new Provisioner(devices_));
    executor_.reset(new ThreadPoolExecutor(devices_, config_.executorThreads));
  }

  VLOG(1) << "Before replace dummy TQPs";
  // Now that we've partitioned and optimized, do some verification based on the
  // dummy mode we're using, if any.
  if (cctx.precisionConfig.replaceDummyTQPs ||
      cctx.precisionConfig.loadUniquedDummyQParams) {
    RETURN_IF_ERR(module->verifyDummyQParams(
        cctx.precisionConfig.loadUniquedDummyQParams));
  }

  // If we are loading an AOT model where we are replacing dummy TQPs, then we
  // may need to update Relu output types on FCs, since they should be set to
  // use zero as min but the correct qparams could not be calculated AOT.
  if (cctx.loadingAOTModel && cctx.precisionConfig.replaceDummyTQPs) {
    LOG(INFO) << "Updating quantized Relu types given real TQPs";
    for (Function *F : module->getFunctions()) {
      updateQuantReluTypes(F);
    }
  }

  VLOG(1) << "Before constant folding";
  // If we prevented constant modification then run constant folding with
  // recording now. Record so that if we are going to serialize we can embed the
  // constant folding subgraphs in the Glow ONNX model.
  ConstantFoldingRecordMap record;
  if (cctx.optimizationOpts.delayAndRecordConstantModification) {
    constModPreventer.deactivateAndCleanup();

    RETURN_ERR_IF_NOT(nodeList.size() == 1, "Expect only one DAG.");
    const auto &dag = *nodeList.begin();
    for (auto &dagNode : dag.nodes) {
      Function *F = module->getFunction(dagNode->name);
      RETURN_ERR_IF_NOT(
          F, strFormat("Function %s not found", dagNode->name.data()));

      ConstantFoldingRecordMap currRecord = constantFoldAndRecord(F, cctx);
      record.insert(currRecord.begin(), currRecord.end());
      runDCEPass(F, cctx);

      // Verify the Function is valid after constant folding takes place.
      Backend &B = provisioner_->getBackend(dagNode->backendName);
      RETURN_ERR_IF_NOT(
          B.verify(*F, cctx.verboseCompile),
          "Unsupported node(s) found after delayed constant folding Function " +
              F->getName().str() + " for backend " + B.getBackendName());
    }
  }
  VLOG(1) << "Before loading AOT";
  if (!cctx.loadingAOTModel) {
    if (cctx.callDAGOptimizer) {
#if FACEBOOK_INTERNAL
      auto optDagErr = optimizeDAG(nodeList, *provisioner_, *module, deviceInfo,
                                   cctx, record);
      if (optDagErr) {
        std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
        cleanupAddNetwork(names);
        RETURN_ERR(optDagErr);
      }
#endif /* FACEBOOK_INTERNAL */
    } else {
      // If not using the DAG optimizer, iterate over the DAGs and call
      // transformPostOptPipeline() on the Functions.
      VLOG(1) << "No DAG optimizer";
      for (const auto &dag : nodeList) {
        for (auto &dagNode : dag.nodes) {
          Function *F = module->getFunction(dagNode->name);
          RETURN_ERR_IF_NOT(
              F, strFormat("Function %s not found", dagNode->name.data()));

          if (cctx.optimizationOpts.onlyLowerFuns.count(F)) {
            continue;
          }

          Backend &B = provisioner_->getBackend(dagNode->backendName);
          RETURN_IF_EXPECTED_IS_ERR(B.transformPostOptPipeline(F, cctx));

          RETURN_ERR_IF_NOT(
              B.verify(*F, cctx.verboseCompile),
              "Unsupported node(s) found after transformPostOptPipeline() " +
                  F->getName().str() + " for backend " + B.getBackendName());
        }
      }
    }
  }

  VLOG(1) << "Before serialize compile DAG";
  // If requested, serialize the resulting DAG that was just optimized and
  // partitioned.
  if (cctx.serializeCompiledDAG) {
    std::string loc;
    char *envSpecifiedSerializationPath = getenv("GLOW_DAG_SERIALIZATION_LOC");
    if (!envSpecifiedSerializationPath) {
      loc = nodeList.begin()->root->name + ".onnxtxt";
    } else {
      loc = std::string(envSpecifiedSerializationPath);
    }

    LOG(INFO) << "Serializing final compiled DAG to " << loc;
    {
      llvm::StringMap<std::string> extraMetadataProps;
      if (cctx.precisionConfig.originNameToTQPMap) {
        RETURN_IF_ERR(ONNXModelWriter::insertLoaderNameUniqueOffsetMetadata(
            extraMetadataProps, *cctx.precisionConfig.originNameToTQPMap));
      }
      if (cctx.precisionConfig.clipQuantRangeToFP16) {
        extraMetadataProps[clipQuantRangeToFP16Key] = "1";
      }
      Error writeErr = Error::empty();
      // Note: If cctx.skipProvisioning then we want to serialize all meta info
      // as we are likely doing AOT optimization. Otherwise do not provide the
      // meta info as the model does not need to be reloaded.
      ONNXModelWriter onnxWR(
          loc, nodeList, 7, 9, &writeErr,
          /* textMode */ true,
          /* zipMode */ cctx.useZipModeForSerializeCompiledDAG,
          /* includeConstantData */ cctx.saveConstantInSerializeCompiledDAG,
          extraMetadataProps, record, cctx.backendOpts.backendSpecificNodeInfo,
          cctx.skipProvisioning ? &cctx.loadedPHNames : nullptr,
          cctx.skipProvisioning ? &cctx.staticPlaceholderTypesForAOT : nullptr,
          cctx.returnGlowSerializedModelStr
              ? cctx.glowAOTSerializationModelStrPtr.get()
              : nullptr);
      RETURN_IF_ERR(writeErr);
    }

    // If we're using AOT DAG optimizer then skip provisioning.
    if (cctx.skipProvisioning ||
        (cctx.callDAGOptimizer && cctx.useDAGOptimizerAOTMode)) {
      LOG(INFO) << "Host manager skipping provisioning";
      {
        std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
        cleanupAddNetwork(names);
      }
      debugDumpDAGGuard.dismiss();
      cleanupConstantFolding(*module, record);
      if (cctx.dumpFinalGraph) {
        for (Function *F : module->getFunctions()) {
          auto fname =
              strFormat("%sfinal_graph_aot_%s.dot", cctx.dumpGraphPath.c_str(),
                        F->getName().data());
          LOG(INFO) << "Dumping final graph to " << fname;
          F->dumpDAG(fname);
        }
      }
      return Error::success();
    }
  }

  // Now that we've serialized the model if requested, cleanup the temporary
  // Functions and PHs used for constant folding.
  cleanupConstantFolding(*module, record);
  VLOG(1) << "Before provisioning";
  auto err = provisioner_->provision(nodeList, *module, cctx);
  if (err) {
    if (err.peekErrorValue()->isFatalError()) {
      statsExporterRegistry_->setCounter(kDeviceFatalError, 1);
    }
    {
      std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
      cleanupAddNetwork(names);
    }
    RETURN_ERR(err);
  }
  debugDumpDAGGuard.dismiss();
  VLOG(1) << "Calculation of maxActiveRequests";
  {
    std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
    /// Calculate networkMaxActive requests. Then update
    /// config_.maxActiveRequests This will be maxActiveRequestsPerInstance *
    /// instanceCount * minReplications or config_.maxActiveRequests whichever
    /// is smaller.

    // Find the minimum on device replication.
    unsigned minReplications{1};
    for (auto &node : nodeList) {
      for (auto &dag : node.nodes) {
        minReplications = std::min(dag->replicationCount, minReplications);
      }
    }
    unsigned product{0};
    if (nodeList.size() && nodeList[0].nodes.size()) {
      product = nodeList[0].nodes[0]->instanceCount *
                cctx.maxActiveRequestsPerInstance * minReplications;
    } else {
      return MAKE_ERR(ErrorValue::ErrorCode::RUNTIME_ERROR,
                      "NodeList is empty.");
    }
    unsigned maxActiveRequests = config_.maxActiveRequests;
    config_.maxActiveRequests = std::min(product, maxActiveRequests);

    // Create pool of cachedExecutionStates.
    for (auto &node : nodeList) {
      // Note: currently getNextNetworkExecutionState assumes that pool size is
      // >= currentInFlight requests, so we set pool size to maxActiveRequests.
      executor_->createPool(node.root.get(), config_.maxActiveRequests,
                            cctx.enableP2P, cctx.enableDRT);
    }
  }
  // Clear constants contents from the module then put it in a
  // shared_ptr to be shared between all of the networks created from each
  // function in the module.
  auto targetBackendName = std::string(devices_[0]->getBackendName());
  const auto &targetBackend = provisioner_->getBackend(targetBackendName);
  if (targetBackend.shouldStripModule() && !cctx.skipModuleStrip) {
    module->strip();
  }
  VLOG(1) << "Cleanup";
  auto sharedModule = std::shared_ptr<Module>(std::move(module));
  {
    std::unique_lock<std::shared_timed_mutex> networkLock(networkLock_);
    for (auto &node : nodeList) {
#if FACEBOOK_INTERNAL
      LOG(INFO) << "Successfully compiled and provisioned " << node.root->name;
#endif
      auto &networkData = networks_[(node.root)->name];
      networkData.dag = std::move(node);
      networkData.module = sharedModule;
    }
    cleanupAddNetwork(names);
  }
  VLOG(1) << "After cleanup";
  return Error::success();
}