in modules/spark-ext/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala [59:127]
override def getDatabase(db: String): CatalogDatabase =
CatalogDatabase(db, db, IGNITE_URI, Map.empty)
/**
* Checks Ignite schema with provided name exists.
*
* @param schema Ignite schema name or <code>SessionCatalog.DEFAULT_DATABASE</code>.
* @return True is Ignite schema exists.
*/
override def databaseExists(schema: String): Boolean =
schema == DEFAULT_DATABASE || allSchemas(ignite).exists(schema.equalsIgnoreCase)
/**
* @return List of all known Ignite schemas.
*/
override def listDatabases(): Seq[String] =
allSchemas(ignite)
/**
* @param pattern Pattern to filter databases names.
* @return List of all known Ignite schema names filtered by pattern.
*/
override def listDatabases(pattern: String): Seq[String] =
StringUtils.filterPattern(listDatabases(), pattern)
/**
* Sets default Ignite schema.
*
* @param schema Name of Ignite schema.
*/
override def setCurrentDatabase(schema: String): Unit =
currentSchema = schema
/** @inheritdoc */
override def getTable(db: String, table: String): CatalogTable = getTableOption(db, table).get
def getTableOption(db: String, tabName: String): Option[CatalogTable] = {
val gridName = igniteName(ignite)
val schemaName = schemaOrDefault(db, currentSchema)
sqlTableInfo(ignite, tabName, Some(db)) match {
case Some(table) ⇒
val tableName = table.tableName
Some(new CatalogTable(
identifier = new TableIdentifier(tableName, Some(schemaName)),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
locationUri = Some(URI.create(IGNITE_PROTOCOL + schemaName + "/" + tableName)),
inputFormat = Some(FORMAT_IGNITE),
outputFormat = Some(FORMAT_IGNITE),
serde = None,
compressed = false,
properties = Map(
OPTION_GRID → gridName,
OPTION_TABLE → tableName)
),
schema = schema(table),
provider = Some(FORMAT_IGNITE),
partitionColumnNames =
if (!allKeyFields(table).isEmpty)
allKeyFields(table).toSeq
else
Seq(table.keyFieldName),
bucketSpec = None))
case None ⇒ None
}
}