in s2core/src/main/scala/org/apache/s2graph/core/Management.scala [515:574]
def createLabel(label: String,
srcServiceName: String,
srcColumnName: String,
srcColumnType: String,
tgtServiceName: String,
tgtColumnName: String,
tgtColumnType: String,
serviceName: String,
indices: Seq[Index],
props: Seq[Prop],
isDirected: Boolean = true,
consistencyLevel: String = "weak",
hTableName: Option[String] = None,
hTableTTL: Option[Int] = None,
schemaVersion: String = DEFAULT_VERSION,
isAsync: Boolean = false,
compressionAlgorithm: String = "gz",
options: Option[String] = None,
initFetcherWithOptions: Boolean = false
): Try[Label] = {
if (label.length > LABEL_NAME_MAX_LENGTH) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : ${LABEL_NAME_MAX_LENGTH}} )")
if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
val labelOpt = Label.findByName(label, useCache = false)
val newLabelTry = Schema withTx { implicit session =>
if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.")
/* create all models */
val newLabel = Label.insertAll(label,
srcServiceName, srcColumnName, srcColumnType,
tgtServiceName, tgtColumnName, tgtColumnType,
isDirected, serviceName, indices, props, consistencyLevel,
hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options)
/* create hbase table */
val storage = graph.getStorage(newLabel)
val service = newLabel.service
val config = toConfig(Map(
ZookeeperQuorum -> service.cluster,
// ColumnFamilies -> List("e", "v"),
RegionMultiplier -> service.preSplitSize,
Ttl -> newLabel.hTableTTL,
CompressionAlgorithm -> newLabel.compressionAlgorithm
))
storage.createTable(config, newLabel.hbaseTableName)
newLabel
}
newLabelTry.foreach { newLabel =>
if (initFetcherWithOptions) {
updateEdgeFetcher(newLabel, options)
} else {
updateEdgeFetcher(newLabel, None)
}
}
newLabelTry
}