fn main()

in eden/mononoke/commit_rewriting/backsyncer/src/main.rs [217:422]


fn main(fb: FacebookInit) -> Result<(), Error> {
    let app_name = "backsyncer cmd-line tool";
    let app = args::MononokeAppBuilder::new(app_name)
        .with_fb303_args()
        .with_source_and_target_repos()
        .build();
    let backsync_forever_subcommand =
        SubCommand::with_name(ARG_MODE_BACKSYNC_FOREVER).about("Backsyncs all new bookmark moves");

    let sync_loop = SubCommand::with_name(ARG_MODE_BACKSYNC_COMMITS)
        .about("Syncs all commits from the file")
        .arg(
            Arg::with_name(ARG_INPUT_FILE)
                .takes_value(true)
                .required(true)
                .help("list of hg commits to backsync"),
        )
        .arg(
            Arg::with_name(ARG_BATCH_SIZE)
                .long(ARG_BATCH_SIZE)
                .takes_value(true)
                .required(false)
                .help("how many commits to backsync at once"),
        );

    let backsync_all_subcommand =
        SubCommand::with_name(ARG_MODE_BACKSYNC_ALL).about("Backsyncs all new bookmark moves once");
    let app = app
        .subcommand(backsync_all_subcommand)
        .subcommand(backsync_forever_subcommand)
        .subcommand(sync_loop);
    let matches = app.get_matches(fb)?;

    let logger = matches.logger();
    let runtime = matches.runtime();
    let config_store = matches.config_store();

    let source_repo_id = args::get_source_repo_id(config_store, &matches)?;
    let target_repo_id = args::get_target_repo_id(config_store, &matches)?;

    let (source_repo_name, _) = args::get_config_by_repoid(config_store, &matches, source_repo_id)?;
    let (target_repo_name, target_repo_config) =
        args::get_config_by_repoid(config_store, &matches, target_repo_id)?;

    let session_container = SessionContainer::new_with_defaults(fb);
    let commit_syncer = {
        let scuba_sample = MononokeScubaSampleBuilder::with_discard();
        let ctx = session_container.new_context(logger.clone(), scuba_sample);
        runtime.block_on(create_commit_syncer_from_matches(&ctx, &matches))?
    };

    let mysql_options = matches.mysql_options();
    let readonly_storage = matches.readonly_storage();

    info!(
        logger,
        "syncing from repoid {:?} into repoid {:?}", source_repo_id, target_repo_id,
    );

    let config_store = matches.config_store();
    let live_commit_sync_config = CfgrLiveCommitSyncConfig::new(&logger, config_store)?;

    match matches.subcommand() {
        (ARG_MODE_BACKSYNC_ALL, _) => {
            let scuba_sample = MononokeScubaSampleBuilder::with_discard();
            let ctx = session_container.new_context(logger.clone(), scuba_sample);
            let db_config = target_repo_config.storage_config.metadata;
            let target_repo_dbs = runtime.block_on(
                open_backsyncer_dbs(
                    ctx.clone(),
                    commit_syncer.get_target_repo().clone(),
                    db_config,
                    mysql_options.clone(),
                    *readonly_storage,
                )
                .boxed(),
            )?;

            // TODO(ikostia): why do we use discarding ScubaSample for BACKSYNC_ALL?
            runtime.block_on(
                backsync_latest(ctx, commit_syncer, target_repo_dbs, BacksyncLimit::NoLimit)
                    .boxed(),
            )?;
        }
        (ARG_MODE_BACKSYNC_FOREVER, _) => {
            let db_config = target_repo_config.storage_config.metadata;
            let ctx = session_container
                .new_context(logger.clone(), MononokeScubaSampleBuilder::with_discard());
            let target_repo_dbs = runtime.block_on(
                open_backsyncer_dbs(
                    ctx,
                    commit_syncer.get_target_repo().clone(),
                    db_config,
                    mysql_options.clone(),
                    *readonly_storage,
                )
                .boxed(),
            )?;

            let mut scuba_sample = MononokeScubaSampleBuilder::new(fb, SCUBA_TABLE);
            scuba_sample.add("source_repo", source_repo_id.id());
            scuba_sample.add("source_repo_name", source_repo_name.clone());
            scuba_sample.add("target_repo", target_repo_id.id());
            scuba_sample.add("target_repo_name", target_repo_name.clone());
            scuba_sample.add_common_server_data();

            let ctx = session_container.new_context(logger.clone(), scuba_sample);
            let f = backsync_forever(
                ctx,
                commit_syncer,
                target_repo_dbs,
                source_repo_name,
                target_repo_name,
                live_commit_sync_config,
            )
            .boxed();

            helpers::block_execute(f, fb, app_name, &logger, &matches, monitoring::AliveService)?;
        }
        (ARG_MODE_BACKSYNC_COMMITS, Some(sub_m)) => {
            let ctx = session_container
                .new_context(logger.clone(), MononokeScubaSampleBuilder::with_discard());
            let inputfile = sub_m
                .value_of(ARG_INPUT_FILE)
                .expect("input file is not set");
            let inputfile = File::open(inputfile)?;
            let file = BufReader::new(&inputfile);
            let batch_size = args::get_usize(&matches, ARG_BATCH_SIZE, 100);

            let source_repo = commit_syncer.get_source_repo().clone();

            let mut hg_cs_ids = vec![];
            for line in file.lines() {
                hg_cs_ids.push(HgChangesetId::from_str(&line?)?);
            }
            let total_to_backsync = hg_cs_ids.len();
            info!(ctx.logger(), "backsyncing {} commits", total_to_backsync);

            let ctx = &ctx;
            let commit_syncer = &commit_syncer;

            let f = stream::iter(hg_cs_ids.clone())
                .chunks(batch_size)
                .map(Result::<_, Error>::Ok)
                .and_then({
                    cloned!(ctx);
                    move |chunk| {
                        cloned!(ctx, source_repo);
                        async move { source_repo.get_hg_bonsai_mapping(ctx.clone(), chunk).await }
                    }
                })
                .try_fold(0, move |backsynced_so_far, hg_bonsai_mapping| {
                    hg_bonsai_mapping
                        .into_iter()
                        .map({
                            move |(_, bonsai)| async move {
                                // Backsyncer is always used in the large-to-small direction,
                                // therefore there can be at most one remapped candidate,
                                // so `CandidateSelectionHint::Only` is a safe choice
                                commit_syncer
                                    .sync_commit(
                                        &ctx,
                                        bonsai.clone(),
                                        CandidateSelectionHint::Only,
                                        CommitSyncContext::Backsyncer,
                                    )
                                    .await?;

                                let maybe_sync_outcome =
                                    commit_syncer.get_commit_sync_outcome(&ctx, bonsai).await?;

                                info!(
                                    ctx.logger(),
                                    "{} backsynced as {:?}", bonsai, maybe_sync_outcome
                                );

                                let maybe_target_cs_id =
                                    extract_cs_id_from_sync_outcome(bonsai, maybe_sync_outcome)?;

                                derive_target_hg_changesets(ctx, maybe_target_cs_id, commit_syncer)
                                    .await
                            }
                        })
                        .collect::<stream::futures_unordered::FuturesUnordered<_>>()
                        .try_fold(backsynced_so_far, {
                            move |backsynced_so_far, _| async move {
                                info!(
                                    ctx.logger(),
                                    "backsynced so far {} out of {}",
                                    backsynced_so_far + 1,
                                    total_to_backsync
                                );
                                Ok::<_, Error>(backsynced_so_far + 1)
                            }
                        })
                });

            runtime.block_on(f)?;
        }
        _ => {
            bail!("unknown subcommand");
        }
    }

    Ok(())
}