in modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala [42:181]
override def shortName(): String = FORMAT_IGNITE
/**
* To create IgniteRelation we need a link to a ignite cluster and a table name.
* To refer cluster user have to specify one of config parameter:
* <ul>
* <li><code>config</code> - path to ignite configuration file.
* </ul>
* Existing table inside Apache Ignite should be referred via <code>table</code> parameter.
*
* @param sqlCtx SQLContext.
* @param params Parameters for relation creation.
* @return IgniteRelation.
* @see IgniteRelation
* @see IgnitionEx#grid(String)
* @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
* @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_SCHEMA
* @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_CONFIG_FILE
*/
override def createRelation(sqlCtx: SQLContext, params: Map[String, String]): BaseRelation =
createRelation(
igniteContext(params, sqlCtx),
params.getOrElse(OPTION_TABLE, throw new IgniteException("'table' must be specified.")),
params.get(OPTION_SCHEMA),
sqlCtx)
/**
* Save `data` to corresponding Ignite table and returns Relation for saved data.
*
* To save data or create IgniteRelation we need a link to a ignite cluster and a table name.
* To refer cluster user have to specify one of config parameter:
* <ul>
* <li><code>config</code> - path to ignite configuration file.
* </ul>
* Existing table inside Apache Ignite should be referred via <code>table</code> or <code>path</code> parameter.
*
* If table doesn't exists it will be created.
* If `mode` is Overwrite and `table` already exists it will be recreated(DROP TABLE, CREATE TABLE).
*
* If table create is required use can set following options:
*
* <ul>
* <li>`OPTION_PRIMARY_KEY_FIELDS` - required option. comma separated list of fields for primary key.</li>
* <li>`OPTION_CACHE_FOR_DDL` - required option. Existing cache name for executing SQL DDL statements.
* <li>`OPTION_CREATE_TABLE_OPTIONS` - Ignite specific parameters for a new table. See WITH [https://apacheignite-sql.readme.io/docs/create-table].</li>
* </ul>
*
* Data write executed 'by partition'. User can set `OPTION_WRITE_PARTITIONS_NUM` - number of partition for data.
*
* @param sqlCtx SQLContext.
* @param mode Save mode.
* @param params Additional parameters.
* @param data Data to save.
* @return IgniteRelation.
*/
override def createRelation(sqlCtx: SQLContext,
mode: SaveMode,
params: Map[String, String],
data: DataFrame): BaseRelation = {
val ctx = igniteContext(params, sqlCtx)
val tblName = tableName(params)
val tblInfoOption = sqlTableInfo(ctx.ignite(), tblName, params.get(OPTION_SCHEMA))
if (tblInfoOption.isDefined) {
mode match {
case Overwrite ⇒
ensureCreateTableOptions(data.schema, params, ctx)
dropTable(tblName, ctx.ignite())
val createTblOpts = params.get(OPTION_CREATE_TABLE_PARAMETERS)
createTable(data.schema,
tblName,
primaryKeyFields(params),
createTblOpts,
ctx.ignite())
saveTable(data,
tblName,
params.get(OPTION_SCHEMA),
ctx,
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
case Append ⇒
saveTable(data,
tblName,
params.get(OPTION_SCHEMA),
ctx,
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
case SaveMode.ErrorIfExists =>
throw new IgniteException(s"Table or view '$tblName' already exists. SaveMode: ErrorIfExists.")
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
// to not save the contents of the DataFrame and to not change the existing data.
// Therefore, it is okay to do nothing here and then just return the relation below.
}
}
else {
ensureCreateTableOptions(data.schema, params, ctx)
val primaryKeyFields = params(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(",")
val createTblOpts = params.get(OPTION_CREATE_TABLE_PARAMETERS)
createTable(data.schema,
tblName,
primaryKeyFields,
createTblOpts,
ctx.ignite())
saveTable(data,
tblName,
params.get(OPTION_SCHEMA),
ctx,
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
}
createRelation(ctx,
tblName,
params.get(OPTION_SCHEMA),
sqlCtx)
}