override def createRelation()

in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala [108:141]


  override def createRelation(outerSQLContext: SQLContext,
                              mode: SaveMode,
                              parameters: Map[String, String],
                              data: DataFrame): BaseRelation = {
    EventHubsClient.userAgent =
      s"Structured-Streaming-$SparkConnectorVersion-${outerSQLContext.sparkSession.sparkContext.version}"

    mode match {
      case SaveMode.Overwrite | SaveMode.Ignore =>
        throw new AnalysisException(
          s"Save mode $mode not allowed for EventHubs. " +
            s"Allowed save modes are ${SaveMode.Append} and " +
            s"${SaveMode.ErrorIfExists} (default).")
      case _ => // good
    }

    EventHubsWriter.write(outerSQLContext.sparkSession, data.queryExecution, parameters)

    /* This method is suppose to return a relation that reads the data that was written.
     * We cannot support this for EventHubs. Therefore, in order to make things consistent,
     * we return an empty base relation.
     */
    new BaseRelation {
      override def sqlContext: SQLContext = unsupportedException
      override def schema: StructType = unsupportedException
      override def needConversion: Boolean = unsupportedException
      override def sizeInBytes: Long = unsupportedException
      override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
      private def unsupportedException =
        throw new UnsupportedOperationException(
          "BaseRelation from EventHubs write " +
            "operation is not usable.")
    }
  }