override def shortName()

in modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala [42:181]


    override def shortName(): String = FORMAT_IGNITE

    /**
      * To create IgniteRelation we need a link to a ignite cluster and a table name.
      * To refer cluster user have to specify one of config parameter:
      * <ul>
      *     <li><code>config</code> - path to ignite configuration file.
      * </ul>
      * Existing table inside Apache Ignite should be referred via <code>table</code> parameter.
      *
      * @param sqlCtx SQLContext.
      * @param params Parameters for relation creation.
      * @return IgniteRelation.
      * @see IgniteRelation
      * @see IgnitionEx#grid(String)
      * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
      * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_SCHEMA
      * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_CONFIG_FILE
      */
    override def createRelation(sqlCtx: SQLContext, params: Map[String, String]): BaseRelation =
        createRelation(
            igniteContext(params, sqlCtx),
            params.getOrElse(OPTION_TABLE, throw new IgniteException("'table' must be specified.")),
            params.get(OPTION_SCHEMA),
            sqlCtx)

    /**
      * Save `data` to corresponding Ignite table and returns Relation for saved data.
      *
      * To save data or create IgniteRelation we need a link to a ignite cluster and a table name.
      * To refer cluster user have to specify one of config parameter:
      * <ul>
      *     <li><code>config</code> - path to ignite configuration file.
      * </ul>
      * Existing table inside Apache Ignite should be referred via <code>table</code> or <code>path</code> parameter.
      *
      * If table doesn't exists it will be created.
      * If `mode` is Overwrite and `table` already exists it will be recreated(DROP TABLE, CREATE TABLE).
      *
      * If table create is required use can set following options:
      *
      * <ul>
      *     <li>`OPTION_PRIMARY_KEY_FIELDS` - required option. comma separated list of fields for primary key.</li>
      *     <li>`OPTION_CACHE_FOR_DDL` - required option. Existing cache name for executing SQL DDL statements.
      *     <li>`OPTION_CREATE_TABLE_OPTIONS` - Ignite specific parameters for a new table. See WITH [https://apacheignite-sql.readme.io/docs/create-table].</li>
      * </ul>
      *
      * Data write executed 'by partition'. User can set `OPTION_WRITE_PARTITIONS_NUM` - number of partition for data.
      *
      * @param sqlCtx SQLContext.
      * @param mode Save mode.
      * @param params Additional parameters.
      * @param data Data to save.
      * @return IgniteRelation.
      */
    override def createRelation(sqlCtx: SQLContext,
        mode: SaveMode,
        params: Map[String, String],
        data: DataFrame): BaseRelation = {

        val ctx = igniteContext(params, sqlCtx)

        val tblName = tableName(params)

        val tblInfoOption = sqlTableInfo(ctx.ignite(), tblName, params.get(OPTION_SCHEMA))

        if (tblInfoOption.isDefined) {
            mode match {
                case Overwrite ⇒
                    ensureCreateTableOptions(data.schema, params, ctx)

                    dropTable(tblName, ctx.ignite())

                    val createTblOpts = params.get(OPTION_CREATE_TABLE_PARAMETERS)

                    createTable(data.schema,
                        tblName,
                        primaryKeyFields(params),
                        createTblOpts,
                        ctx.ignite())

                    saveTable(data,
                        tblName,
                        params.get(OPTION_SCHEMA),
                        ctx,
                        params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
                        params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
                        params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
                        params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
                        params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))

                case Append ⇒
                    saveTable(data,
                        tblName,
                        params.get(OPTION_SCHEMA),
                        ctx,
                        params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
                        params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
                        params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
                        params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
                        params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))

                case SaveMode.ErrorIfExists =>
                    throw new IgniteException(s"Table or view '$tblName' already exists. SaveMode: ErrorIfExists.")

                case SaveMode.Ignore =>
                    // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
                    // to not save the contents of the DataFrame and to not change the existing data.
                    // Therefore, it is okay to do nothing here and then just return the relation below.
            }
        }
        else {
            ensureCreateTableOptions(data.schema, params, ctx)

            val primaryKeyFields = params(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(",")

            val createTblOpts = params.get(OPTION_CREATE_TABLE_PARAMETERS)

            createTable(data.schema,
                tblName,
                primaryKeyFields,
                createTblOpts,
                ctx.ignite())

            saveTable(data,
                tblName,
                params.get(OPTION_SCHEMA),
                ctx,
                params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
                params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean),
                params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
                params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
                params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
        }

        createRelation(ctx,
            tblName,
            params.get(OPTION_SCHEMA),
            sqlCtx)
    }