in src/main/scala/org/apache/spark/sql/DataFrameJdbcWriter.scala [80:131]
def jdbc(url: String, table: String, primaryKeys: Seq[String], indexColumns: Seq[String], textColumns: Seq[String], postWriteSql: String, writesPerSecond: Double, connectionProperties: Properties): Unit = {
val props = new Properties()
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnectionFactory(url, props)()
try {
var tableExists = JdbcUtils.tableExists(conn, url, table)
if (mode == SaveMode.Ignore && tableExists) {
return
}
if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $table already exists.")
}
if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)
tableExists = false
}
// Create the table if the table didn't exist.
if (!tableExists) {
//val schema = JdbcUtils.schemaString(df, url)
//val sql = s"CREATE TABLE $table ($schema)"
val columnNames = df.schema.fields.map(_.name).toList.asJava
val columnTypes = df.schema.fields.map(_.dataType).toList.asJava
val sql = com.uber.uberscriptquery.jdbc.JdbcUtils.getCreateTableSql(
columnNames, columnTypes, table, primaryKeys.toList.asJava, indexColumns.toList.asJava, textColumns.toList.asJava)
val statement = conn.createStatement
try {
System.out.println(String.format("Creating jdbc table: %s", sql))
statement.executeUpdate(sql)
System.out.println(String.format("Created jdbc table: %s", sql))
} finally {
statement.close()
}
}
} finally {
conn.close()
}
val retry = if (primaryKeys.isEmpty) {
false
} else {
true
}
System.out.println(String.format("Save to table %s with retry %s", table, String.valueOf(retry)))
JdbcWriterUtils.saveTable(df, url, table, textColumns, postWriteSql, retry, writesPerSecond, props)
}