in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala [108:141]
override def createRelation(outerSQLContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
EventHubsClient.userAgent =
s"Structured-Streaming-$SparkConnectorVersion-${outerSQLContext.sparkSession.sparkContext.version}"
mode match {
case SaveMode.Overwrite | SaveMode.Ignore =>
throw new AnalysisException(
s"Save mode $mode not allowed for EventHubs. " +
s"Allowed save modes are ${SaveMode.Append} and " +
s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
EventHubsWriter.write(outerSQLContext.sparkSession, data.queryExecution, parameters)
/* This method is suppose to return a relation that reads the data that was written.
* We cannot support this for EventHubs. Therefore, in order to make things consistent,
* we return an empty base relation.
*/
new BaseRelation {
override def sqlContext: SQLContext = unsupportedException
override def schema: StructType = unsupportedException
override def needConversion: Boolean = unsupportedException
override def sizeInBytes: Long = unsupportedException
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
private def unsupportedException =
throw new UnsupportedOperationException(
"BaseRelation from EventHubs write " +
"operation is not usable.")
}
}