in cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala [101:187]
def interpret[StatementT <: Statement[StatementT]](session:CqlSession, stringStatements : String,
context: InterpreterContext): InterpreterResult = {
logger.info(s"Executing CQL statements : \n\n$stringStatements\n")
try {
val queries: List[AnyBlock] = parseInput(stringStatements)
val queryOptions = extractQueryOptions(queries
.filter(_.blockType == ParameterBlock)
.map(_.get[QueryParameters]))
val executionFormatter = extractFormatter(context)
logger.info(s"Current Cassandra query options = $queryOptions")
val queryStatements = queries.filter(_.blockType == StatementBlock).map(_.get[QueryStatement])
//Remove prepared statements
queryStatements
.filter(_.statementType == RemovePrepareStatementType)
.map(_.getStatement[RemovePrepareStm])
.foreach(remove => {
logger.debug(s"Removing prepared statement '${remove.name}'")
preparedStatements.remove(remove.name)
})
//Update prepared statement maps
queryStatements
.filter(_.statementType == PrepareStatementType)
.map(_.getStatement[PrepareStm])
.foreach(statement => {
logger.debug(s"Get or prepare statement '${statement.name}' : ${statement.query}")
preparedStatements.getOrElseUpdate(statement.name, session.prepare(statement.query))
})
val statements: Seq[Any] = queryStatements
.filter(st => (st.statementType != PrepareStatementType) && (st.statementType != RemovePrepareStatementType))
.map {
case x: SimpleStm =>
generateSimpleStatement(x, queryOptions, context)
case x: BatchStm =>
val builtStatements = x.statements.map {
case st: SimpleStm => generateSimpleStatement(st, queryOptions, context)
case st: BoundStm => generateBoundStatement(session, st, queryOptions, context)
case _ => throw new InterpreterException(s"Unknown statement type")
}
generateBatchStatement(x.batchType, queryOptions, builtStatements)
case x: BoundStm =>
generateBoundStatement(session, x, queryOptions, context)
case x: DescribeCommandStatement => x
case x: HelpCmd => x
case x => throw new InterpreterException(s"Unknown statement type : $x")
}
val results: Seq[(Any,Any)] = for (statement <- statements)
yield (enhancedSession.execute(statement),statement)
if (results.nonEmpty) {
results.last match {
case(res: ResultSet, st: StatementT) =>
buildResponseMessage((res, st), executionFormatter)
case(output: String, _) => new InterpreterResult(Code.SUCCESS, output)
case _ => throw new InterpreterException(s"Cannot parse result type : ${results.last}")
}
} else {
new InterpreterResult(Code.SUCCESS, enhancedSession.displayNoResult)
}
} catch {
case dex: DriverException =>
logger.error(dex.getMessage, dex)
new InterpreterResult(Code.ERROR, parseException(dex))
case pex:ParsingException =>
logger.error(pex.getMessage, pex)
new InterpreterResult(Code.ERROR, pex.getMessage)
case iex: InterpreterException =>
logger.error(iex.getMessage, iex)
new InterpreterResult(Code.ERROR, iex.getMessage)
case ex: java.lang.Exception =>
logger.error(ex.getMessage, ex)
new InterpreterResult(Code.ERROR, parseException(ex))
}
}