in interactive_engine/executor/ir/runtime/src/assembly.rs [197:961]
fn install(
&self, mut stream: Stream<Record>, plan: &[pb::PhysicalOpr],
) -> Result<Stream<Record>, BuildJobError> {
let mut prev_op_kind = pb::physical_opr::operator::OpKind::Root(pb::Root {});
for op in &plan[..] {
let op_kind = to_op_kind(op)?;
match op_kind {
OpKind::Repartition(repartition) => {
let repartition_strategy = repartition.strategy.as_ref().ok_or_else(|| {
FnGenError::from(ParsePbError::EmptyFieldError(
"Empty repartition strategy".to_string(),
))
})?;
match repartition_strategy {
pb::repartition::Strategy::ToAnother(shuffle) => {
let router = self.udf_gen.gen_shuffle(shuffle)?;
stream = stream.repartition(move |t| router.route(t));
}
pb::repartition::Strategy::ToOthers(_) => stream = stream.broadcast(),
}
}
OpKind::Project(project) => {
let func = self.udf_gen.gen_project(project)?;
stream = stream.filter_map_with_name("Project", move |input| func.exec(input))?;
}
OpKind::Select(select) => {
let func = self.udf_gen.gen_filter(select)?;
stream = stream.filter(move |input| func.test(input))?;
}
OpKind::Unfold(unfold) => {
let func = self.udf_gen.gen_unfold(unfold)?;
stream = stream.flat_map_with_name("Unfold", move |input| func.exec(input))?;
}
OpKind::Limit(limit) => {
let range = limit.range.ok_or_else(|| {
FnGenError::from(ParsePbError::EmptyFieldError("pb::Limit::range".to_string()))
})?;
// e.g., `limit(10)` would be translate as `Range{lower=0, upper=10}`
if range.upper <= range.lower || range.lower != 0 {
Err(FnGenError::from(ParsePbError::ParseError(format!(
"range {:?} in Limit Operator",
range
))))?;
}
stream = stream.limit(range.upper as u32)?;
}
OpKind::OrderBy(order) => {
let cmp = self.udf_gen.gen_cmp(order.clone())?;
if let Some(range) = order.limit {
if range.upper <= range.lower || range.lower != 0 {
Err(FnGenError::from(ParsePbError::ParseError(format!(
"range {:?} in Order Operator",
range
))))?;
}
stream = stream.sort_limit_by(range.upper as u32, move |a, b| cmp.compare(a, b))?;
} else {
stream = stream.sort_by(move |a, b| cmp.compare(a, b))?;
}
}
OpKind::GroupBy(group) => {
if group.mappings.is_empty() {
// fold case
let fold = self.udf_gen.gen_fold(group)?;
if let server_pb::AccumKind::Cnt = fold.get_accum_kind() {
let fold_map = fold.gen_fold_map()?;
stream = stream
.count()?
.map(move |cnt| fold_map.exec(cnt))?
.into_stream()?;
} else {
// TODO: optimize this by fold_partition + fold
let fold_accum = fold.gen_fold_accum()?;
stream = stream
.fold(fold_accum, || {
|mut accumulator, next| {
accumulator.accum(next)?;
Ok(accumulator)
}
})?
.map(move |mut accum| Ok(accum.finalize()?))?
.into_stream()?;
}
} else {
// group case
let group = self.udf_gen.gen_group(group)?;
let group_key = group.gen_group_key()?;
let group_accum = group.gen_group_accum()?;
let group_map = group.gen_group_map()?;
stream = stream
.key_by(move |record| group_key.get_kv(record))?
.fold_partition_by_key(group_accum, || {
|mut accumulator, next| {
accumulator.accum(next)?;
Ok(accumulator)
}
})?
.unfold(|kv_map| {
Ok(kv_map
.into_iter()
.map(|(key, mut accumulator)| {
accumulator.finalize().map(|value| (key, value))
})
.collect::<Result<Vec<_>, _>>()?
.into_iter())
})?
.map(move |key_value| group_map.exec(key_value))?;
}
}
OpKind::Dedup(dedup) => {
let selector = self.udf_gen.gen_dedup(dedup)?;
stream = stream
.key_by(move |record| selector.get_kv(record))?
.dedup()?
.map(|pair| Ok(pair.value))?;
}
OpKind::Union(union) => {
let (mut ori_stream, sub_stream) = stream.copied()?;
stream = self.install(sub_stream, &union.sub_plans[0].plan[..])?;
for subtask in &union.sub_plans[1..] {
let copied = ori_stream.copied()?;
ori_stream = copied.0;
stream = self
.install(copied.1, &subtask.plan[..])?
.merge(stream)?;
}
}
OpKind::Apply(apply) => {
if apply.keys.is_empty() {
// apply
let apply_gen = self.udf_gen.gen_apply(apply.clone())?;
let join_kind = apply_gen.get_join_kind();
let join_func = apply_gen.gen_left_join_func()?;
let sub_task = apply.sub_plan.as_ref().ok_or_else(|| {
BuildJobError::Unsupported("Task is missing in Apply".to_string())
})?;
stream = match join_kind {
JoinKind::Semi => stream
.apply(|sub_start| {
let has_sub = self
.install(sub_start, &sub_task.plan[..])?
.any()?;
Ok(has_sub)
})?
.filter_map(
move |(parent, has_sub)| {
if has_sub {
Ok(Some(parent))
} else {
Ok(None)
}
},
)?,
JoinKind::Anti => stream
.apply(|sub_start| {
let has_sub = self
.install(sub_start, &sub_task.plan[..])?
.any()?;
Ok(has_sub)
})?
.filter_map(
move |(parent, has_sub)| {
if has_sub {
Ok(None)
} else {
Ok(Some(parent))
}
},
)?,
JoinKind::Inner | JoinKind::LeftOuter => stream
.apply(|sub_start| {
let sub_end = self
.install(sub_start, &sub_task.plan[..])?
.collect::<Vec<Record>>()?;
Ok(sub_end)
})?
.filter_map(move |(parent, sub)| join_func.exec(parent, sub))?,
_ => Err(BuildJobError::Unsupported(format!(
"Do not support join_kind {:?} in Apply",
join_kind
)))?,
};
} else {
// segment apply
Err(FnGenError::unsupported_error("SegmentApply Operator"))?
}
}
OpKind::Join(join) => {
let joiner = self.udf_gen.gen_join(join.clone())?;
let left_key_selector = joiner.gen_left_kv_fn()?;
let right_key_selector = joiner.gen_right_kv_fn()?;
let join_kind = joiner.get_join_kind();
let left_task = join
.left_plan
.as_ref()
.ok_or_else(|| FnGenError::ParseError("left_task is missing in merge".into()))?;
let right_task = join
.right_plan
.as_ref()
.ok_or_else(|| FnGenError::ParseError("right_task is missing in merge".into()))?;
let (left_stream, right_stream) = stream.copied()?;
let left_stream = self
.install(left_stream, &left_task.plan[..])?
.key_by(move |record| left_key_selector.get_kv(record))?;
let right_stream = self
.install(right_stream, &right_task.plan[..])?
.key_by(move |record| right_key_selector.get_kv(record))?;
stream = match join_kind {
JoinKind::Inner => left_stream
.inner_join(right_stream)?
.map(|(left, right)| Ok(left.value.join(right.value, None)))?,
JoinKind::LeftOuter => {
left_stream
.left_outer_join(right_stream)?
.map(|(left, right)| {
let left = left.ok_or_else(|| {
FnExecError::unexpected_data_error(
"left is None in left outer join",
)
})?;
if let Some(right) = right {
// TODO(bingqing): Specify HeadJoinOpt if necessary
Ok(left.value.join(right.value, None))
} else {
Ok(left.value)
}
})?
}
JoinKind::RightOuter => left_stream
.right_outer_join(right_stream)?
.map(|(left, right)| {
let right = right.ok_or_else(|| {
FnExecError::unexpected_data_error("right is None in right outer join")
})?;
if let Some(left) = left {
Ok(left.value.join(right.value, None))
} else {
Ok(right.value)
}
})?,
JoinKind::FullOuter => {
left_stream
.full_outer_join(right_stream)?
.map(|(left, right)| match (left, right) {
(Some(left), Some(right)) => Ok(left.value.join(right.value, None)),
(Some(left), None) => Ok(left.value),
(None, Some(right)) => Ok(right.value),
(None, None) => {
unreachable!()
}
})?
}
JoinKind::Semi => left_stream
.semi_join(right_stream)?
.map(|left| Ok(left.value))?,
JoinKind::Anti => left_stream
.anti_join(right_stream)?
.map(|left| Ok(left.value))?,
JoinKind::Times => Err(BuildJobError::Unsupported(
"JoinKind of Times is not supported yet".to_string(),
))?,
}
}
OpKind::Intersect(intersect) => {
// The subplan in intersect can be:
// 1) (repartition) + EdgeExpand (ExpandV) which is to expand and intersect on id-only vertices;
// or (repartition) + EdgeExpand (ExpandV) + (repartition) + GetV(Itself), which is to expand and intersect on vertices.
// In this case, GetV(Itself) usually stands for further filtering on the intersected vertices.
// 2) (repartition) + EdgeExpand (ExpandE) + GetV(Adj), which is to expand and intersect on vertices.
// In this case, EdgeExpand and GetV are not fused, usually with alias in EdgeExpand; Not supported yet;
// or (repartition) + EdgeExpand (ExpandE) + GetV(Adj) + (repartition) + GetV(Itself), which is to expand and intersect on vertices.
// In this case, EdgeExpand and GetV are not fused, usually with alias in EdgeExpand;
// And GetV(Itself) usually stands for further filtering on the intersected vertices.
// 3) (repartition) + PathExpand + GetV(EndV), which is to expand paths and intersect on the end vertices.
// or (repartition) + PathExpand + GetV(EndV) + (repartition) + GetV(Itself), which is to expand paths and intersect on the end vertices.
// And GetV(Itself) usually stands for further filtering on the intersected vertices.
// Specifically, we slightly modify the plan due to the implementation, as follows:
// 1) basically, we extract all the last edge_expand step (ExpandV or ExpandE+GetV), for intersection;
// 2) the intersect results is a collection, we need to add an Unfold op after intersect;
// 3) if there are some further filters on the intersected vertices on distributed graph database,
// we need to add a repartition + Auxilia op after Unfold to filter the intersected vertices.
// 4) for the cases of PathExpand, we need to pre-expand the path and endV, and then intersect on the last edge_expand.
// 5) specifically, if the edge_expands to intersect are all ExpandV, we can apply the optimized intersect implementation
// i.e., ExpandIntersect which won't preserve any edges during the intersect
let mut intersected_expands = vec![];
let mut pre_expands = vec![];
let mut auxilia: Option<pb::GetV> = None;
let mut auxilia_repartition = None;
for mut subplan in intersect.sub_plans {
let subplan_clone = subplan.clone();
let mut last_op = subplan.plan.pop().ok_or_else(|| {
FnGenError::from(ParsePbError::EmptyFieldError(
"subplan in pb::Intersect::plan".to_string(),
))
})?;
// if the last opr is Auxilia, move it after intersect
if let OpKind::Vertex(mut vertex) = to_op_kind(&last_op)? {
if vertex.opt == pb::get_v::VOpt::Itself as i32 {
vertex.tag = Some(intersect.key);
auxilia = Some(vertex.clone());
if subplan
.plan
.last()
.map(|op| op.is_repartition())
.unwrap_or(false)
{
auxilia_repartition = subplan.plan.pop();
}
last_op = subplan.plan.pop().ok_or_else(|| {
FnGenError::unsupported_error(&format!(
"subplan with only getV in pb::Intersect::plan {:?}",
vertex,
))
})?;
}
}
// then, process subplans after removing the last Auxilia
let last_op_kind = to_op_kind(&last_op)?;
match last_op_kind {
// case 1: EdgeExpandV
OpKind::Edge(mut expand) => {
expand.alias = Some(intersect.key.clone());
if let Some(opr) = subplan.plan.last() {
if opr.is_repartition() {
intersected_expands.push((subplan.plan.pop(), expand, None));
} else {
Err(FnGenError::unsupported_error(&format!(
"Subplan in Intersection in EdgeExpandV {:?}",
PhysicalPlanPrinter(&subplan_clone),
)))?
}
} else {
intersected_expands.push((None, expand, None));
}
}
// case 2/3: PathExpand/EdgeExpand + GetV
OpKind::Vertex(mut get_v) => {
let prev_opr_kind = to_op_kind(&subplan.plan.pop().ok_or_else(|| {
FnGenError::unsupported_error(&format!(
"subplan with only getV in pb::Intersect::plan {:?}",
get_v,
))
})?)?;
match prev_opr_kind {
OpKind::Edge(edge_expand) => {
// case2: ExpandE + GetV(Adj)
if get_v.opt == pb::get_v::VOpt::Itself as i32 {
Err(FnGenError::unsupported_error(&format!(
"Subplan in Intersection in EdgeExpandE+GetV {:?}",
PhysicalPlanPrinter(&subplan_clone),
)))?
}
// note that this get_v won't take filters, as it should be translated to auxilia.
if let Some(params) = &get_v.params {
if params.has_predicates() || params.has_columns() {
Err(FnGenError::unsupported_error(&format!(
"Subplan in Intersection in EdgeExpandE+GetV {:?}",
PhysicalPlanPrinter(&subplan_clone),
)))?
}
}
if let Some(opr) = subplan.plan.last() {
if opr.is_repartition() {
intersected_expands.push((
subplan.plan.pop(),
edge_expand,
Some(get_v),
));
} else {
Err(FnGenError::unsupported_error(&format!(
"Subplan in Intersection in EdgeExpandE+GetV {:?}",
PhysicalPlanPrinter(&subplan_clone),
)))?
}
} else {
intersected_expands.push((None, edge_expand, Some(get_v)));
}
}
OpKind::Path(mut path_expand) => {
// case3: PathExpand + GetV(EndV)
if get_v.opt != pb::get_v::VOpt::End as i32 {
Err(FnGenError::unsupported_error(&format!(
"Subplan in Intersection in PathExpand + GetV {:?}",
PhysicalPlanPrinter(&subplan_clone),
)))?
}
let path_repartition = if let Some(opr) = subplan.plan.last() {
if opr.is_repartition() {
subplan.plan.pop()
} else {
Err(FnGenError::unsupported_error(&format!(
"Subplan in Intersection in PathExpand + GetV {:?}",
PhysicalPlanPrinter(&subplan_clone),
)))?
}
} else {
None
};
// the case of expand paths and intersect on the end vertices
// Process path_expand as follows:
// 1. If path_expand range from 0, it is unsupported;
// 2. If it is path_expand(1,2), optimized as edge_expand;
// 3. Otherwise, translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand,
// and the last edge_expand is the one to intersect.
// Notice that if we have predicates for vertices in path_expand, or for the last vertex of path_expand,
// do the filtering after intersection.
// TODO: there might be a bug here:
// if path_expand has an alias which indicates that the path would be referred later, it may not as expected.
let path_expand_base =
path_expand.base.as_ref().ok_or_else(|| {
FnGenError::ParseError(
"PathExpand::base in Pattern is empty".into(),
)
})?;
let base_edge_expand = path_expand_base
.edge_expand
.as_ref()
.ok_or_else(|| {
FnGenError::ParseError(
"PathExpand::base::edge_expand is empty".into(),
)
})?;
// current only support expand_opt = ExpandV
if base_edge_expand.expand_opt
!= pb::edge_expand::ExpandOpt::Vertex as i32
{
Err(FnGenError::unsupported_error(&format!(
"PathExpand in Intersection with expand {:?}",
base_edge_expand
)))?
}
// pick the last edge expand out from the path expand
let hop_range =
path_expand.hop_range.as_mut().ok_or_else(|| {
FnGenError::ParseError(
"pb::PathExpand::hop_range is empty".into(),
)
})?;
if hop_range.lower < 1 {
Err(FnGenError::unsupported_error(&format!(
"PathExpand in Intersection with lower range of {:?}",
hop_range.lower
)))?
}
let mut edge_expand = base_edge_expand.clone();
let mut edge_repartition = None;
if hop_range.lower == 1 && hop_range.upper == 2 {
// optimized Path(1..2) to as EdgeExpand
edge_expand.v_tag = path_expand.start_tag;
edge_expand.alias = get_v.alias;
edge_repartition = path_repartition.clone();
} else {
// translate path_expand(l,h) to path_expand(l-1, h-1) + endV() + edge_expand,
edge_expand.v_tag = None;
// edge expand should carry endv's alias, which is the intersect key.
edge_expand.alias = get_v.alias.clone();
get_v.alias.take();
hop_range.lower -= 1;
hop_range.upper -= 1;
// pre expand path_expand(l-1, h-1)
if let Some(repartition) = path_repartition.clone() {
pre_expands.push(repartition);
}
pre_expands.push(path_expand.into());
pre_expands.push(get_v.into());
if path_repartition.is_some() {
edge_repartition = Some(
pb::Repartition {
strategy: Some(
pb::repartition::Strategy::ToAnother(
pb::repartition::Shuffle {
shuffle_key: None,
},
),
),
}
.into(),
);
}
}
// and then expand and intersect on the last edge_expand
intersected_expands.push((
edge_repartition.clone(),
edge_expand,
None,
));
}
_ => Err(FnGenError::unsupported_error(&format!(
"Subplan in Intersection to intersect: {:?}",
PhysicalPlanPrinter(&subplan),
)))?,
}
}
_ => Err(FnGenError::unsupported_error(&format!(
"Opr in Intersection to intersect: {:?}",
last_op_kind
)))?,
}
}
// pre-expanding for the path_expand case
if !pre_expands.is_empty() {
stream = self.install(stream, &pre_expands)?;
}
// process intersect of edge_expands
let is_optimized = intersected_expands
.iter()
.all(|(_, _, get_v)| get_v.is_none());
let mut intersect_expand_funcs = Vec::with_capacity(intersected_expands.len());
for (repartition, expand, get_v) in intersected_expands {
let expand_func = if !is_optimized {
self.udf_gen
.gen_general_edge_expand_collection(expand, get_v)?
} else {
self.udf_gen
.gen_edge_expand_collection(expand)?
};
intersect_expand_funcs.push((repartition, expand_func));
}
// intersect of edge_expands
for (repartition, expand_intersect_func) in intersect_expand_funcs {
if let Some(repartition) = repartition {
stream = self.install(stream, &vec![repartition])?;
}
stream = stream.filter_map_with_name("ExpandIntersect", move |input| {
expand_intersect_func.exec(input)
})?;
}
// unfold the intersection
let unfold =
pb::Unfold { tag: Some(intersect.key.into()), alias: Some(intersect.key.into()) };
stream = self.install(stream, &vec![unfold.into()])?;
// add vertex filters
if let Some(mut auxilia) = auxilia {
auxilia.tag = Some(intersect.key.into());
if let Some(auxilia_repartition) = auxilia_repartition {
stream = self.install(stream, &vec![auxilia_repartition, auxilia.into()])?;
} else {
stream = self.install(stream, &vec![auxilia.into()])?;
}
}
}
OpKind::Vertex(vertex) => {
let vertex_opt: algebra_pb::get_v::VOpt = unsafe { std::mem::transmute(vertex.opt) };
match vertex_opt {
algebra_pb::get_v::VOpt::Both => {
let func = self.udf_gen.gen_both_vertex(vertex)?;
stream = stream.flat_map_with_name("GetV", move |input| func.exec(input))?;
}
_ => {
let func = self.udf_gen.gen_vertex(vertex)?;
stream = stream.filter_map_with_name("GetV", move |input| func.exec(input))?;
}
}
}
OpKind::Edge(edge) => {
let func = self.udf_gen.gen_edge_expand(edge)?;
stream = stream.flat_map_with_name("EdgeExpand", move |input| func.exec(input))?;
}
OpKind::Path(path) => {
let mut base = path.base.clone().ok_or_else(|| {
FnGenError::from(ParsePbError::EmptyFieldError("pb::PathExpand::base".to_string()))
})?;
let range = path.hop_range.as_ref().ok_or_else(|| {
FnGenError::from(ParsePbError::EmptyFieldError(
"pb::PathExpand::hop_range".to_string(),
))
})?;
if range.upper <= range.lower || range.lower < 0 || range.upper <= 0 {
Err(FnGenError::from(ParsePbError::ParseError(format!(
"range {:?} in PathExpand Operator",
range
))))?;
}
// path start
let path_start_func = self.udf_gen.gen_path_start(path.clone())?;
stream = stream
.filter_map_with_name("PathStart", move |input| path_start_func.exec(input))?;
// path base expand
let mut base_expand_plan = vec![];
// process edge_expand
let edge_expand = base.edge_expand.take().ok_or_else(|| {
FnGenError::from(ParsePbError::ParseError(format!(
"empty EdgeExpand of ExpandBase in PathExpand Operator {:?}",
base
)))
})?;
if (pb::path_expand::ResultOpt::AllVE
== unsafe { std::mem::transmute(path.result_opt) }
|| pb::path_expand::PathOpt::Trail == unsafe { std::mem::transmute(path.path_opt) })
&& pb::edge_expand::ExpandOpt::Vertex
== unsafe { std::mem::transmute(edge_expand.expand_opt) }
{
// the case when base expand is expand vertex, but needs to expand edges + vertices since the result opt is ALLVE
// TODO: in the new compilation stack, this case will not happen.
let mut edge_expand_e = edge_expand.clone();
edge_expand_e.expand_opt = pb::edge_expand::ExpandOpt::Edge as i32;
let alias = edge_expand_e.alias.take();
let get_v =
pb::GetV { opt: pb::get_v::VOpt::Other as i32, tag: None, params: None, alias };
base_expand_plan.push(edge_expand_e.into());
base_expand_plan.push(get_v.into());
} else {
base_expand_plan.push(edge_expand.into());
}
let repartition = if let OpKind::Repartition(_) = &prev_op_kind {
// the case when base expand needs repartition
Some(
pb::Repartition {
strategy: Some(pb::repartition::Strategy::ToAnother(
pb::repartition::Shuffle { shuffle_key: None },
)),
}
.into(),
)
} else {
None
};
// process get_v
if let Some(mut getv) = base.get_v.take() {
if (pb::get_v::VOpt::Itself as i32) == getv.opt {
// the case of expandv + auxilia (to deal with filtering on vertices).
if let Some(repartition) = repartition {
base_expand_plan.push(repartition);
}
base_expand_plan.push(getv.clone().into());
} else {
// the case of expande + getv
// specifically, if getv has predicates or columns,
// separate it as getv(getAdj) + auxilia (for filter/get columns)
let needs_separate = getv
.params
.as_ref()
.map_or(false, |params| params.has_predicates() || params.has_columns());
if needs_separate {
let alias = getv.alias.take();
let params = getv.params.take();
let auxilia = pb::GetV {
opt: pb::get_v::VOpt::Itself as i32,
tag: None,
params: params,
alias,
};
base_expand_plan.push(getv.clone().into());
if let Some(repartition) = repartition {
base_expand_plan.push(repartition);
}
base_expand_plan.push(auxilia.into());
} else {
base_expand_plan.push(getv.clone().into());
if let Some(repartition) = repartition {
base_expand_plan.push(repartition);
}
}
}
} else {
// the case of expandv
if let Some(repartition) = repartition {
base_expand_plan.push(repartition);
}
}
for _ in 0..range.lower {
stream = self.install(stream, &base_expand_plan)?;
}
let times = range.upper - range.lower - 1;
if times > 0 {
if path.condition.is_some() {
let mut until = IterCondition::max_iters(times as u32);
let func = self.udf_gen.gen_path_condition(path.clone())?;
until.set_until(func);
// Notice that if UNTIL condition set, we expand path without `Emit`
stream = stream
.iterate_until(until, |start| self.install(start, &base_expand_plan[..]))?;
} else {
let (mut hop_stream, copied_stream) = stream.copied()?;
stream = copied_stream;
for _ in 0..times {
hop_stream = self.install(hop_stream, &base_expand_plan[..])?;
let copied = hop_stream.copied()?;
hop_stream = copied.0;
stream = stream.merge(copied.1)?;
}
}
}
// path end to add path_alias if exists
if path.alias.is_some() {
let path_end_func = self.udf_gen.gen_path_end(path)?;
stream = stream.map_with_name("PathEnd", move |input| path_end_func.exec(input))?;
}
}
OpKind::Scan(scan) => {
let udf_gen = self.udf_gen.clone();
stream = stream.flat_map(move |_| {
let scan_iter = udf_gen.gen_source(scan.clone().into());
Ok(scan_iter?)
})?;
}
OpKind::Sample(sample) => {
if let Some(sample_weight) = &sample.sample_weight {
if sample_weight.tag.is_some() || sample_weight.property.is_some() {
return Err(FnGenError::from(ParsePbError::ParseError(
"sample_weight is not supported yet".to_string(),
)))?;
}
}
if let Some(sample_type) = &sample.sample_type {
match &sample_type.inner {
// the case of Coin
Some(algebra_pb::sample::sample_type::Inner::SampleByRatio(_)) => {
let func = self.udf_gen.gen_coin(sample)?;
stream = stream.filter(move |input| func.test(input))?;
}
// the case of Sample
Some(algebra_pb::sample::sample_type::Inner::SampleByNum(_)) => {
let partial_sample_accum = self.udf_gen.gen_sample(sample)?;
let sample_accum = partial_sample_accum.clone();
stream = stream
.fold_partition(partial_sample_accum, move || {
move |mut sample_accum, next| {
sample_accum.accum(next)?;
Ok(sample_accum)
}
})?
.unfold(move |mut sample_accum| Ok(sample_accum.finalize()?))?
.fold(sample_accum, move || {
move |mut sample_accum, next| {
sample_accum.accum(next)?;
Ok(sample_accum)
}
})?
.unfold(move |mut sample_accum| Ok(sample_accum.finalize()?))?
}
None => Err(FnGenError::from(ParsePbError::EmptyFieldError(
"pb::Sample::sample_type.inner".to_string(),
)))?,
}
} else {
Err(FnGenError::from(ParsePbError::EmptyFieldError(
"pb::Sample::sample_type".to_string(),
)))?;
}
}
OpKind::Root(_) => {
// do nothing, as it is a dummy node
}
OpKind::Sink(_) => {
// this would be processed in assemble, and cannot be reached when install.
Err(FnGenError::unsupported_error("unreachable sink in install"))?
}
OpKind::ProcedureCall(procedure_call) => Err(FnGenError::unsupported_error(&format!(
"ProcedureCall Operator {:?}",
procedure_call
)))?,
}
prev_op_kind = to_op_kind(op)?;
}
Ok(stream)
}