in spark/sql/dataframe.go [318:353]
func (df *dataFrameImpl) CorrWithMethod(ctx context.Context, col1, col2 string, method string) (float64, error) {
plan := &proto.Plan{
OpType: &proto.Plan_Root{
Root: &proto.Relation{
Common: &proto.RelationCommon{
PlanId: newPlanId(),
},
RelType: &proto.Relation_Corr{
Corr: &proto.StatCorr{
Input: df.relation,
Col1: col1,
Col2: col2,
Method: &method,
},
},
},
},
}
responseClient, err := df.session.client.ExecutePlan(ctx, plan)
if err != nil {
return 0, sparkerrors.WithType(fmt.Errorf("failed to execute plan: %w", err), sparkerrors.ExecutionError)
}
_, table, err := responseClient.ToTable()
if err != nil {
return 0, err
}
values, err := types.ReadArrowTableToRows(table)
if err != nil {
return 0, err
}
return values[0].At(0).(float64), nil
}