fn getpack()

in eden/mononoke/repo_client/src/client/mod.rs [651:910]


    fn getpack<WeightedContent, Content, GetpackHandler>(
        &self,
        params: BoxStream<(MPath, Vec<HgFileNodeId>), Error>,
        handler: GetpackHandler,
        name: &'static str,
    ) -> BoxStream<BytesOld, Error>
    where
        WeightedContent: Future<Item = (GetpackBlobInfo, Content), Error = Error> + Send + 'static,
        Content:
            Future<Item = (HgFileNodeId, Bytes, Option<Metadata>), Error = Error> + Send + 'static,
        GetpackHandler: Fn(CoreContext, BlobRepo, HgFileNodeId, SessionLfsParams, bool) -> WeightedContent
            + Send
            + 'static,
    {
        let allow_short_getpack_history = self.knobs.allow_short_getpack_history;
        self.command_stream(name, UNSAMPLED, |ctx, command_logger| {
            let undesired_path_logger =
                try_boxstream!(UndesiredPathLogger::new(ctx.clone(), self.repo.blobrepo()));
            let undesired_path_logger = Arc::new(undesired_path_logger);
            // We buffer all parameters in memory so that we can log them.
            // That shouldn't be a problem because requests are quite small
            let getpack_params = Arc::new(Mutex::new(vec![]));
            let repo = self.repo.blobrepo().clone();

            let lfs_params = self.lfs_params();

            let hash_validation_percentage = tunables().get_hash_validation_percentage();
            let validate_hash =
                rand::thread_rng().gen_ratio(hash_validation_percentage as u32, 100);
            let getpack_buffer_size = 500;

            let request_stream = move || {
                let content_stream = {
                    cloned!(ctx, getpack_params, lfs_params, undesired_path_logger);

                    async move {
                        let buffered_params = BufferedParams {
                            weight_limit: 100_000_000,
                            buffer_size: getpack_buffer_size,
                        };

                        // Let's fetch the whole request before responding.
                        // That's prevents deadlocks, because hg client doesn't start reading the response
                        // before all the arguments were sent.
                        let params = params.compat().try_collect::<Vec<_>>().await?;

                        ctx.scuba()
                            .clone()
                            .add("getpack_paths", params.len())
                            .log_with_msg("Getpack Params", None);

                        let res = stream::iter(params.into_iter())
                            .map({
                                cloned!(ctx, getpack_params, repo, lfs_params);
                                move |(path, filenodes)| {
                                    {
                                        let mut getpack_params = getpack_params.lock().unwrap();
                                        getpack_params.push((path.clone(), filenodes.clone()));
                                    }

                                    ctx.session().bump_load(Metric::GetpackFiles, 1.0);

                                    let blob_futs: Vec<_> = filenodes
                                        .iter()
                                        .map(|filenode| {
                                            handler(
                                                ctx.clone(),
                                                repo.clone(),
                                                *filenode,
                                                lfs_params.clone(),
                                                validate_hash,
                                            )
                                            .compat()
                                        })
                                        .collect();

                                    // NOTE: We don't otherwise await history_fut until we have the results
                                    // from blob_futs, so we need to spawn this to start fetching history
                                    // before we have resoved hg filenodes.
                                    let history_fut = tokio::task::spawn(
                                        get_unordered_file_history_for_multiple_nodes(
                                            ctx.clone(),
                                            repo.clone(),
                                            filenodes.into_iter().collect(),
                                            &path,
                                            allow_short_getpack_history,
                                        )
                                        .compat()
                                        .try_collect::<Vec<_>>(),
                                    )
                                    .flatten_err();

                                    cloned!(undesired_path_logger);

                                    async move {
                                        let blobs =
                                            future::try_join_all(blob_futs.into_iter()).await?;

                                        undesired_path_logger.maybe_log_file(
                                            Some(&path),
                                            blobs.iter().map(|(blobinfo, _)| blobinfo.filesize),
                                        );

                                        let total_weight = blobs
                                            .iter()
                                            .map(|(blob_info, _)| blob_info.weight)
                                            .sum();
                                        let content_futs =
                                            blobs.into_iter().map(|(_, fut)| fut.compat());
                                        let contents_and_history = future::try_join(
                                            future::try_join_all(content_futs),
                                            history_fut,
                                        )
                                        .map_ok(move |(contents, history)| {
                                            (path, contents, history)
                                        });

                                        Result::<_, Error>::Ok((contents_and_history, total_weight))
                                    }
                                }
                            })
                            .buffered(getpack_buffer_size)
                            .try_buffered_weight_limited(buffered_params);

                        Result::<_, Error>::Ok(res)
                    }
                }
                .try_flatten_stream();

                let serialized_stream = content_stream
                    .whole_stream_timeout(getpack_timeout())
                    .yield_periodically()
                    .flatten_err()
                    .boxed()
                    .compat()
                    .map({
                        cloned!(ctx);
                        move |(path, contents, history)| {
                            let mut res = vec![wirepack::Part::HistoryMeta {
                                path: RepoPath::FilePath(path.clone()),
                                entry_count: history.len() as u32,
                            }];

                            let history = history.into_iter().map(|history_entry| {
                                let (p1, p2, copy_from) = convert_parents_to_remotefilelog_format(
                                    history_entry.parents(),
                                    history_entry.copyfrom().as_ref(),
                                );
                                let linknode = history_entry.linknode().into_nodehash();
                                if linknode == NULL_HASH {
                                    ctx.perf_counters()
                                        .increment_counter(PerfCounterType::NullLinknode);
                                    STATS::null_linknode_getpack.add_value(1);
                                }

                                wirepack::Part::History(wirepack::HistoryEntry {
                                    node: history_entry.filenode().into_nodehash(),
                                    p1: p1.into_nodehash(),
                                    p2: p2.into_nodehash(),
                                    linknode,
                                    copy_from: copy_from.cloned().map(RepoPath::FilePath),
                                })
                            });
                            res.extend(history);

                            res.push(wirepack::Part::DataMeta {
                                path: RepoPath::FilePath(path),
                                entry_count: contents.len() as u32,
                            });
                            for (filenode, content, metadata) in contents {
                                let content = content.to_vec();
                                let length = content.len() as u64;

                                ctx.perf_counters().set_max_counter(
                                    PerfCounterType::GetpackMaxFileSize,
                                    length as i64,
                                );

                                if let Some(lfs_threshold) = lfs_params.threshold {
                                    if length >= lfs_threshold {
                                        ctx.perf_counters().add_to_counter(
                                            PerfCounterType::GetpackPossibleLFSFilesSumSize,
                                            length as i64,
                                        );

                                        ctx.perf_counters().increment_counter(
                                            PerfCounterType::GetpackNumPossibleLFSFiles,
                                        );
                                    }
                                }

                                res.push(wirepack::Part::Data(wirepack::DataEntry {
                                    node: filenode.into_nodehash(),
                                    delta_base: NULL_HASH,
                                    delta: Delta::new_fulltext(content),
                                    metadata,
                                }));
                            }
                            stream_old::iter_ok(res.into_iter())
                        }
                    })
                    .flatten()
                    .chain(stream_old::once(Ok(wirepack::Part::End)));

                wirepack::packer::WirePackPacker::new(serialized_stream, wirepack::Kind::File)
                    .and_then(|chunk| chunk.into_bytes())
                    .inspect({
                        cloned!(ctx);
                        move |bytes| {
                            let len = bytes.len() as i64;
                            ctx.perf_counters()
                                .add_to_counter(PerfCounterType::GetpackResponseSize, len);

                            STATS::total_fetched_file_size.add_value(len as i64);
                            if ctx.session().is_quicksand() {
                                STATS::quicksand_fetched_file_size.add_value(len as i64);
                            }
                        }
                    })
                    .boxify()
                    .compat()
                    .timed({
                        cloned!(ctx);
                        move |stats| {
                            STATS::getpack_ms
                                .add_value(stats.completion_time.as_millis_unchecked() as i64);
                            let encoded_params = {
                                let getpack_params = getpack_params.lock().unwrap();
                                let mut encoded_params: Vec<(String, Vec<String>)> = vec![];
                                for (path, filenodes) in getpack_params.iter() {
                                    let mut encoded_filenodes = vec![];
                                    for filenode in filenodes {
                                        encoded_filenodes.push(format!("{}", filenode));
                                    }
                                    encoded_params.push((
                                        String::from_utf8_lossy(&path.to_vec()).to_string(),
                                        encoded_filenodes,
                                    ));
                                }
                                encoded_params
                            };

                            ctx.perf_counters().add_to_counter(
                                PerfCounterType::GetpackNumFiles,
                                encoded_params.len() as i64,
                            );

                            log_getpack_params_verbose(&ctx, &encoded_params);
                            command_logger.finalize_command(&stats);

                            future::ready(())
                        }
                    })
                    .boxed()
                    .compat()
            };

            throttle_stream(&self.session, Metric::GetpackFiles, name, request_stream).boxify()
        })
    }