in eden/mononoke/commit_rewriting/backsyncer/src/main.rs [217:422]
fn main(fb: FacebookInit) -> Result<(), Error> {
let app_name = "backsyncer cmd-line tool";
let app = args::MononokeAppBuilder::new(app_name)
.with_fb303_args()
.with_source_and_target_repos()
.build();
let backsync_forever_subcommand =
SubCommand::with_name(ARG_MODE_BACKSYNC_FOREVER).about("Backsyncs all new bookmark moves");
let sync_loop = SubCommand::with_name(ARG_MODE_BACKSYNC_COMMITS)
.about("Syncs all commits from the file")
.arg(
Arg::with_name(ARG_INPUT_FILE)
.takes_value(true)
.required(true)
.help("list of hg commits to backsync"),
)
.arg(
Arg::with_name(ARG_BATCH_SIZE)
.long(ARG_BATCH_SIZE)
.takes_value(true)
.required(false)
.help("how many commits to backsync at once"),
);
let backsync_all_subcommand =
SubCommand::with_name(ARG_MODE_BACKSYNC_ALL).about("Backsyncs all new bookmark moves once");
let app = app
.subcommand(backsync_all_subcommand)
.subcommand(backsync_forever_subcommand)
.subcommand(sync_loop);
let matches = app.get_matches(fb)?;
let logger = matches.logger();
let runtime = matches.runtime();
let config_store = matches.config_store();
let source_repo_id = args::get_source_repo_id(config_store, &matches)?;
let target_repo_id = args::get_target_repo_id(config_store, &matches)?;
let (source_repo_name, _) = args::get_config_by_repoid(config_store, &matches, source_repo_id)?;
let (target_repo_name, target_repo_config) =
args::get_config_by_repoid(config_store, &matches, target_repo_id)?;
let session_container = SessionContainer::new_with_defaults(fb);
let commit_syncer = {
let scuba_sample = MononokeScubaSampleBuilder::with_discard();
let ctx = session_container.new_context(logger.clone(), scuba_sample);
runtime.block_on(create_commit_syncer_from_matches(&ctx, &matches))?
};
let mysql_options = matches.mysql_options();
let readonly_storage = matches.readonly_storage();
info!(
logger,
"syncing from repoid {:?} into repoid {:?}", source_repo_id, target_repo_id,
);
let config_store = matches.config_store();
let live_commit_sync_config = CfgrLiveCommitSyncConfig::new(&logger, config_store)?;
match matches.subcommand() {
(ARG_MODE_BACKSYNC_ALL, _) => {
let scuba_sample = MononokeScubaSampleBuilder::with_discard();
let ctx = session_container.new_context(logger.clone(), scuba_sample);
let db_config = target_repo_config.storage_config.metadata;
let target_repo_dbs = runtime.block_on(
open_backsyncer_dbs(
ctx.clone(),
commit_syncer.get_target_repo().clone(),
db_config,
mysql_options.clone(),
*readonly_storage,
)
.boxed(),
)?;
// TODO(ikostia): why do we use discarding ScubaSample for BACKSYNC_ALL?
runtime.block_on(
backsync_latest(ctx, commit_syncer, target_repo_dbs, BacksyncLimit::NoLimit)
.boxed(),
)?;
}
(ARG_MODE_BACKSYNC_FOREVER, _) => {
let db_config = target_repo_config.storage_config.metadata;
let ctx = session_container
.new_context(logger.clone(), MononokeScubaSampleBuilder::with_discard());
let target_repo_dbs = runtime.block_on(
open_backsyncer_dbs(
ctx,
commit_syncer.get_target_repo().clone(),
db_config,
mysql_options.clone(),
*readonly_storage,
)
.boxed(),
)?;
let mut scuba_sample = MononokeScubaSampleBuilder::new(fb, SCUBA_TABLE);
scuba_sample.add("source_repo", source_repo_id.id());
scuba_sample.add("source_repo_name", source_repo_name.clone());
scuba_sample.add("target_repo", target_repo_id.id());
scuba_sample.add("target_repo_name", target_repo_name.clone());
scuba_sample.add_common_server_data();
let ctx = session_container.new_context(logger.clone(), scuba_sample);
let f = backsync_forever(
ctx,
commit_syncer,
target_repo_dbs,
source_repo_name,
target_repo_name,
live_commit_sync_config,
)
.boxed();
helpers::block_execute(f, fb, app_name, &logger, &matches, monitoring::AliveService)?;
}
(ARG_MODE_BACKSYNC_COMMITS, Some(sub_m)) => {
let ctx = session_container
.new_context(logger.clone(), MononokeScubaSampleBuilder::with_discard());
let inputfile = sub_m
.value_of(ARG_INPUT_FILE)
.expect("input file is not set");
let inputfile = File::open(inputfile)?;
let file = BufReader::new(&inputfile);
let batch_size = args::get_usize(&matches, ARG_BATCH_SIZE, 100);
let source_repo = commit_syncer.get_source_repo().clone();
let mut hg_cs_ids = vec![];
for line in file.lines() {
hg_cs_ids.push(HgChangesetId::from_str(&line?)?);
}
let total_to_backsync = hg_cs_ids.len();
info!(ctx.logger(), "backsyncing {} commits", total_to_backsync);
let ctx = &ctx;
let commit_syncer = &commit_syncer;
let f = stream::iter(hg_cs_ids.clone())
.chunks(batch_size)
.map(Result::<_, Error>::Ok)
.and_then({
cloned!(ctx);
move |chunk| {
cloned!(ctx, source_repo);
async move { source_repo.get_hg_bonsai_mapping(ctx.clone(), chunk).await }
}
})
.try_fold(0, move |backsynced_so_far, hg_bonsai_mapping| {
hg_bonsai_mapping
.into_iter()
.map({
move |(_, bonsai)| async move {
// Backsyncer is always used in the large-to-small direction,
// therefore there can be at most one remapped candidate,
// so `CandidateSelectionHint::Only` is a safe choice
commit_syncer
.sync_commit(
&ctx,
bonsai.clone(),
CandidateSelectionHint::Only,
CommitSyncContext::Backsyncer,
)
.await?;
let maybe_sync_outcome =
commit_syncer.get_commit_sync_outcome(&ctx, bonsai).await?;
info!(
ctx.logger(),
"{} backsynced as {:?}", bonsai, maybe_sync_outcome
);
let maybe_target_cs_id =
extract_cs_id_from_sync_outcome(bonsai, maybe_sync_outcome)?;
derive_target_hg_changesets(ctx, maybe_target_cs_id, commit_syncer)
.await
}
})
.collect::<stream::futures_unordered::FuturesUnordered<_>>()
.try_fold(backsynced_so_far, {
move |backsynced_so_far, _| async move {
info!(
ctx.logger(),
"backsynced so far {} out of {}",
backsynced_so_far + 1,
total_to_backsync
);
Ok::<_, Error>(backsynced_so_far + 1)
}
})
});
runtime.block_on(f)?;
}
_ => {
bail!("unknown subcommand");
}
}
Ok(())
}