in modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala [40:131]
def igniteName(g: Ignite): String =
if(g.name() != null)
g.name
else
""
/**
* @param schema Name of schema.
* @param default Default schema.
* @return Schema to use.
*/
def schemaOrDefault(schema: String, default: String): String =
if (schema == SessionCatalog.DEFAULT_DATABASE)
default
else
schema
/**
* @param gridName Name of grid.
* @return Named instance of grid. If 'gridName' is empty unnamed instance returned.
*/
def ignite(gridName: String): Ignite =
if (gridName == "")
Ignition.ignite()
else
Ignition.ignite(gridName)
/**
* @param ignite Ignite instance.
* @param tabName Table name.
* @param schemaName Optional schema name.
* @return True if table exists false otherwise.
*/
def sqlTableExists(ignite: Ignite, tabName: String, schemaName: Option[String]): Boolean =
sqlTableInfo(ignite, tabName, schemaName).isDefined
/**
* @param ignite Ignite instance.
* @param tabName Table name.
* @param schemaName Optional schema name.
* @return Cache name for given table.
*/
def sqlCacheName(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[String] =
sqlTableInfo(ignite, tabName, schemaName).map(_.asInstanceOf[QueryTypeDescriptorImpl].cacheName)
/**
* @param ignite Ignite instance.
* @return All schemas in given Ignite instance.
*/
def allSchemas(ignite: Ignite): Seq[String] = ignite.cacheNames
.map(name =>
normalizeSchemaName(name,
ignite.cache[Any,Any](name).getConfiguration(classOf[CacheConfiguration[Any,Any]]).getSqlSchema))
.toSeq
.distinct
/**
* @param ignite Ignite instance.
* @param schemaName Schema name.
* @return All cache configurations for the given schema.
*/
def cachesForSchema[K,V](ignite: Ignite, schemaName: Option[String]): Seq[CacheConfiguration[K,V]] =
ignite.cacheNames
.map(ignite.cache[K,V](_).getConfiguration(classOf[CacheConfiguration[K,V]]))
.filter(ccfg =>
schemaName.forall(normalizeSchemaName(ccfg.getName, ccfg.getSqlSchema).equalsIgnoreCase(_)) ||
schemaName.contains(SessionCatalog.DEFAULT_DATABASE))
.toSeq
/**
* @param ignite Ignite instance.
* @param tabName Table name.
* @param schemaName Optional schema name.
* @return GridQueryTypeDescriptor for a given table.
*/
def sqlTableInfo(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[GridQueryTypeDescriptor] =
ignite.asInstanceOf[IgniteEx].context.cache.publicCacheNames
.flatMap(cacheName => ignite.asInstanceOf[IgniteEx].context.query.types(cacheName))
.find(table => table.tableName.equalsIgnoreCase(tabName) && isValidSchema(table, schemaName))
/**
* @param table GridQueryTypeDescriptor for a given table.
* @param schemaName Optional schema name.
* @return `True` if schema is valid.
*/
def isValidSchema(table: GridQueryTypeDescriptor, schemaName: Option[String]): Boolean =
schemaName match {
case Some(schema) =>
schema.equalsIgnoreCase(table.schemaName) || schema.equals(SessionCatalog.DEFAULT_DATABASE)
case None =>
true
}