in eden/mononoke/repo_client/src/client/mod.rs [651:910]
fn getpack<WeightedContent, Content, GetpackHandler>(
&self,
params: BoxStream<(MPath, Vec<HgFileNodeId>), Error>,
handler: GetpackHandler,
name: &'static str,
) -> BoxStream<BytesOld, Error>
where
WeightedContent: Future<Item = (GetpackBlobInfo, Content), Error = Error> + Send + 'static,
Content:
Future<Item = (HgFileNodeId, Bytes, Option<Metadata>), Error = Error> + Send + 'static,
GetpackHandler: Fn(CoreContext, BlobRepo, HgFileNodeId, SessionLfsParams, bool) -> WeightedContent
+ Send
+ 'static,
{
let allow_short_getpack_history = self.knobs.allow_short_getpack_history;
self.command_stream(name, UNSAMPLED, |ctx, command_logger| {
let undesired_path_logger =
try_boxstream!(UndesiredPathLogger::new(ctx.clone(), self.repo.blobrepo()));
let undesired_path_logger = Arc::new(undesired_path_logger);
// We buffer all parameters in memory so that we can log them.
// That shouldn't be a problem because requests are quite small
let getpack_params = Arc::new(Mutex::new(vec![]));
let repo = self.repo.blobrepo().clone();
let lfs_params = self.lfs_params();
let hash_validation_percentage = tunables().get_hash_validation_percentage();
let validate_hash =
rand::thread_rng().gen_ratio(hash_validation_percentage as u32, 100);
let getpack_buffer_size = 500;
let request_stream = move || {
let content_stream = {
cloned!(ctx, getpack_params, lfs_params, undesired_path_logger);
async move {
let buffered_params = BufferedParams {
weight_limit: 100_000_000,
buffer_size: getpack_buffer_size,
};
// Let's fetch the whole request before responding.
// That's prevents deadlocks, because hg client doesn't start reading the response
// before all the arguments were sent.
let params = params.compat().try_collect::<Vec<_>>().await?;
ctx.scuba()
.clone()
.add("getpack_paths", params.len())
.log_with_msg("Getpack Params", None);
let res = stream::iter(params.into_iter())
.map({
cloned!(ctx, getpack_params, repo, lfs_params);
move |(path, filenodes)| {
{
let mut getpack_params = getpack_params.lock().unwrap();
getpack_params.push((path.clone(), filenodes.clone()));
}
ctx.session().bump_load(Metric::GetpackFiles, 1.0);
let blob_futs: Vec<_> = filenodes
.iter()
.map(|filenode| {
handler(
ctx.clone(),
repo.clone(),
*filenode,
lfs_params.clone(),
validate_hash,
)
.compat()
})
.collect();
// NOTE: We don't otherwise await history_fut until we have the results
// from blob_futs, so we need to spawn this to start fetching history
// before we have resoved hg filenodes.
let history_fut = tokio::task::spawn(
get_unordered_file_history_for_multiple_nodes(
ctx.clone(),
repo.clone(),
filenodes.into_iter().collect(),
&path,
allow_short_getpack_history,
)
.compat()
.try_collect::<Vec<_>>(),
)
.flatten_err();
cloned!(undesired_path_logger);
async move {
let blobs =
future::try_join_all(blob_futs.into_iter()).await?;
undesired_path_logger.maybe_log_file(
Some(&path),
blobs.iter().map(|(blobinfo, _)| blobinfo.filesize),
);
let total_weight = blobs
.iter()
.map(|(blob_info, _)| blob_info.weight)
.sum();
let content_futs =
blobs.into_iter().map(|(_, fut)| fut.compat());
let contents_and_history = future::try_join(
future::try_join_all(content_futs),
history_fut,
)
.map_ok(move |(contents, history)| {
(path, contents, history)
});
Result::<_, Error>::Ok((contents_and_history, total_weight))
}
}
})
.buffered(getpack_buffer_size)
.try_buffered_weight_limited(buffered_params);
Result::<_, Error>::Ok(res)
}
}
.try_flatten_stream();
let serialized_stream = content_stream
.whole_stream_timeout(getpack_timeout())
.yield_periodically()
.flatten_err()
.boxed()
.compat()
.map({
cloned!(ctx);
move |(path, contents, history)| {
let mut res = vec![wirepack::Part::HistoryMeta {
path: RepoPath::FilePath(path.clone()),
entry_count: history.len() as u32,
}];
let history = history.into_iter().map(|history_entry| {
let (p1, p2, copy_from) = convert_parents_to_remotefilelog_format(
history_entry.parents(),
history_entry.copyfrom().as_ref(),
);
let linknode = history_entry.linknode().into_nodehash();
if linknode == NULL_HASH {
ctx.perf_counters()
.increment_counter(PerfCounterType::NullLinknode);
STATS::null_linknode_getpack.add_value(1);
}
wirepack::Part::History(wirepack::HistoryEntry {
node: history_entry.filenode().into_nodehash(),
p1: p1.into_nodehash(),
p2: p2.into_nodehash(),
linknode,
copy_from: copy_from.cloned().map(RepoPath::FilePath),
})
});
res.extend(history);
res.push(wirepack::Part::DataMeta {
path: RepoPath::FilePath(path),
entry_count: contents.len() as u32,
});
for (filenode, content, metadata) in contents {
let content = content.to_vec();
let length = content.len() as u64;
ctx.perf_counters().set_max_counter(
PerfCounterType::GetpackMaxFileSize,
length as i64,
);
if let Some(lfs_threshold) = lfs_params.threshold {
if length >= lfs_threshold {
ctx.perf_counters().add_to_counter(
PerfCounterType::GetpackPossibleLFSFilesSumSize,
length as i64,
);
ctx.perf_counters().increment_counter(
PerfCounterType::GetpackNumPossibleLFSFiles,
);
}
}
res.push(wirepack::Part::Data(wirepack::DataEntry {
node: filenode.into_nodehash(),
delta_base: NULL_HASH,
delta: Delta::new_fulltext(content),
metadata,
}));
}
stream_old::iter_ok(res.into_iter())
}
})
.flatten()
.chain(stream_old::once(Ok(wirepack::Part::End)));
wirepack::packer::WirePackPacker::new(serialized_stream, wirepack::Kind::File)
.and_then(|chunk| chunk.into_bytes())
.inspect({
cloned!(ctx);
move |bytes| {
let len = bytes.len() as i64;
ctx.perf_counters()
.add_to_counter(PerfCounterType::GetpackResponseSize, len);
STATS::total_fetched_file_size.add_value(len as i64);
if ctx.session().is_quicksand() {
STATS::quicksand_fetched_file_size.add_value(len as i64);
}
}
})
.boxify()
.compat()
.timed({
cloned!(ctx);
move |stats| {
STATS::getpack_ms
.add_value(stats.completion_time.as_millis_unchecked() as i64);
let encoded_params = {
let getpack_params = getpack_params.lock().unwrap();
let mut encoded_params: Vec<(String, Vec<String>)> = vec![];
for (path, filenodes) in getpack_params.iter() {
let mut encoded_filenodes = vec![];
for filenode in filenodes {
encoded_filenodes.push(format!("{}", filenode));
}
encoded_params.push((
String::from_utf8_lossy(&path.to_vec()).to_string(),
encoded_filenodes,
));
}
encoded_params
};
ctx.perf_counters().add_to_counter(
PerfCounterType::GetpackNumFiles,
encoded_params.len() as i64,
);
log_getpack_params_verbose(&ctx, &encoded_params);
command_logger.finalize_command(&stats);
future::ready(())
}
})
.boxed()
.compat()
};
throttle_stream(&self.session, Metric::GetpackFiles, name, request_stream).boxify()
})
}