in sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala [132:243]
def transformRelation(rel: proto.Relation): LogicalPlan =
transformRelation(rel, cachePlan = false)
/**
* The root of the query plan is a relation and we apply the transformations to it.
* @param rel
* The relation to transform.
* @param cachePlan
* Set to true for a performance optimization, if the plan is likely to be reused, e.g. built
* upon by further dataset transformation. The default is false.
* @return
* The resolved logical plan.
*/
@DeveloperApi
def transformRelation(rel: proto.Relation, cachePlan: Boolean): LogicalPlan = {
sessionHolder.usePlanCache(rel, cachePlan) { rel =>
val plan = rel.getRelTypeCase match {
// DataFrame API
case proto.Relation.RelTypeCase.SHOW_STRING => transformShowString(rel.getShowString)
case proto.Relation.RelTypeCase.HTML_STRING => transformHtmlString(rel.getHtmlString)
case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead)
case proto.Relation.RelTypeCase.PROJECT => transformProject(rel.getProject)
case proto.Relation.RelTypeCase.FILTER => transformFilter(rel.getFilter)
case proto.Relation.RelTypeCase.LIMIT => transformLimit(rel.getLimit)
case proto.Relation.RelTypeCase.OFFSET => transformOffset(rel.getOffset)
case proto.Relation.RelTypeCase.TAIL => transformTail(rel.getTail)
case proto.Relation.RelTypeCase.JOIN => transformJoinOrJoinWith(rel.getJoin)
case proto.Relation.RelTypeCase.AS_OF_JOIN => transformAsOfJoin(rel.getAsOfJoin)
case proto.Relation.RelTypeCase.LATERAL_JOIN => transformLateralJoin(rel.getLateralJoin)
case proto.Relation.RelTypeCase.DEDUPLICATE => transformDeduplicate(rel.getDeduplicate)
case proto.Relation.RelTypeCase.SET_OP => transformSetOperation(rel.getSetOp)
case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
case proto.Relation.RelTypeCase.DROP => transformDrop(rel.getDrop)
case proto.Relation.RelTypeCase.AGGREGATE => transformAggregate(rel.getAggregate)
case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
case proto.Relation.RelTypeCase.WITH_RELATIONS =>
transformWithRelations(rel.getWithRelations)
case proto.Relation.RelTypeCase.LOCAL_RELATION =>
transformLocalRelation(rel.getLocalRelation)
case proto.Relation.RelTypeCase.SAMPLE => transformSample(rel.getSample)
case proto.Relation.RelTypeCase.RANGE => transformRange(rel.getRange)
case proto.Relation.RelTypeCase.SUBQUERY_ALIAS =>
transformSubqueryAlias(rel.getSubqueryAlias)
case proto.Relation.RelTypeCase.REPARTITION => transformRepartition(rel.getRepartition)
case proto.Relation.RelTypeCase.FILL_NA => transformNAFill(rel.getFillNa)
case proto.Relation.RelTypeCase.DROP_NA => transformNADrop(rel.getDropNa)
case proto.Relation.RelTypeCase.REPLACE => transformReplace(rel.getReplace)
case proto.Relation.RelTypeCase.SUMMARY => transformStatSummary(rel.getSummary)
case proto.Relation.RelTypeCase.DESCRIBE => transformStatDescribe(rel.getDescribe)
case proto.Relation.RelTypeCase.COV => transformStatCov(rel.getCov)
case proto.Relation.RelTypeCase.CORR => transformStatCorr(rel.getCorr)
case proto.Relation.RelTypeCase.APPROX_QUANTILE =>
transformStatApproxQuantile(rel.getApproxQuantile)
case proto.Relation.RelTypeCase.CROSSTAB =>
transformStatCrosstab(rel.getCrosstab)
case proto.Relation.RelTypeCase.FREQ_ITEMS => transformStatFreqItems(rel.getFreqItems)
case proto.Relation.RelTypeCase.SAMPLE_BY =>
transformStatSampleBy(rel.getSampleBy)
case proto.Relation.RelTypeCase.TO_SCHEMA => transformToSchema(rel.getToSchema)
case proto.Relation.RelTypeCase.TO_DF =>
transformToDF(rel.getToDf)
case proto.Relation.RelTypeCase.WITH_COLUMNS_RENAMED =>
transformWithColumnsRenamed(rel.getWithColumnsRenamed)
case proto.Relation.RelTypeCase.WITH_COLUMNS => transformWithColumns(rel.getWithColumns)
case proto.Relation.RelTypeCase.WITH_WATERMARK =>
transformWithWatermark(rel.getWithWatermark)
case proto.Relation.RelTypeCase.CACHED_LOCAL_RELATION =>
transformCachedLocalRelation(rel.getCachedLocalRelation)
case proto.Relation.RelTypeCase.HINT => transformHint(rel.getHint)
case proto.Relation.RelTypeCase.UNPIVOT => transformUnpivot(rel.getUnpivot)
case proto.Relation.RelTypeCase.TRANSPOSE => transformTranspose(rel.getTranspose)
case proto.Relation.RelTypeCase.UNRESOLVED_TABLE_VALUED_FUNCTION =>
transformUnresolvedTableValuedFunction(rel.getUnresolvedTableValuedFunction)
case proto.Relation.RelTypeCase.REPARTITION_BY_EXPRESSION =>
transformRepartitionByExpression(rel.getRepartitionByExpression)
case proto.Relation.RelTypeCase.MAP_PARTITIONS =>
transformMapPartitions(rel.getMapPartitions)
case proto.Relation.RelTypeCase.GROUP_MAP =>
transformGroupMap(rel.getGroupMap)
case proto.Relation.RelTypeCase.CO_GROUP_MAP =>
transformCoGroupMap(rel.getCoGroupMap)
case proto.Relation.RelTypeCase.APPLY_IN_PANDAS_WITH_STATE =>
transformApplyInPandasWithState(rel.getApplyInPandasWithState)
case proto.Relation.RelTypeCase.COMMON_INLINE_USER_DEFINED_TABLE_FUNCTION =>
transformCommonInlineUserDefinedTableFunction(
rel.getCommonInlineUserDefinedTableFunction)
case proto.Relation.RelTypeCase.CACHED_REMOTE_RELATION =>
transformCachedRemoteRelation(rel.getCachedRemoteRelation)
case proto.Relation.RelTypeCase.COLLECT_METRICS =>
transformCollectMetrics(rel.getCollectMetrics, rel.getCommon.getPlanId)
case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse)
case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
throw new IndexOutOfBoundsException("Expected Relation to be set, but is empty.")
// Catalog API (internal-only)
case proto.Relation.RelTypeCase.CATALOG => transformCatalog(rel.getCatalog)
// ML Relation
case proto.Relation.RelTypeCase.ML_RELATION =>
MLHandler.transformMLRelation(rel.getMlRelation, sessionHolder).logicalPlan
// Handle plugins for Spark Connect Relation types.
case proto.Relation.RelTypeCase.EXTENSION =>
transformRelationPlugin(rel.getExtension)
case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
}
if (rel.hasCommon && rel.getCommon.hasPlanId) {
plan.setTagValue(LogicalPlan.PLAN_ID_TAG, rel.getCommon.getPlanId)
}
plan
}
}