def interpret[StatementT <: Statement[StatementT]]()

in cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala [101:187]


  def interpret[StatementT <: Statement[StatementT]](session:CqlSession, stringStatements : String,
                                                     context: InterpreterContext): InterpreterResult = {

    logger.info(s"Executing CQL statements : \n\n$stringStatements\n")

    try {
      val queries: List[AnyBlock] = parseInput(stringStatements)

      val queryOptions = extractQueryOptions(queries
        .filter(_.blockType == ParameterBlock)
        .map(_.get[QueryParameters]))
      val executionFormatter = extractFormatter(context)

      logger.info(s"Current Cassandra query options = $queryOptions")

      val queryStatements = queries.filter(_.blockType == StatementBlock).map(_.get[QueryStatement])

      //Remove prepared statements
      queryStatements
        .filter(_.statementType == RemovePrepareStatementType)
        .map(_.getStatement[RemovePrepareStm])
        .foreach(remove => {
          logger.debug(s"Removing prepared statement '${remove.name}'")
          preparedStatements.remove(remove.name)
        })

      //Update prepared statement maps
      queryStatements
        .filter(_.statementType == PrepareStatementType)
        .map(_.getStatement[PrepareStm])
        .foreach(statement => {
          logger.debug(s"Get or prepare statement '${statement.name}' : ${statement.query}")
          preparedStatements.getOrElseUpdate(statement.name, session.prepare(statement.query))
        })

      val statements: Seq[Any] = queryStatements
        .filter(st => (st.statementType != PrepareStatementType) && (st.statementType != RemovePrepareStatementType))
        .map {
          case x: SimpleStm =>
            generateSimpleStatement(x, queryOptions, context)

          case x: BatchStm =>
            val builtStatements = x.statements.map {
              case st: SimpleStm => generateSimpleStatement(st, queryOptions, context)
              case st: BoundStm => generateBoundStatement(session, st, queryOptions, context)
              case _ => throw new InterpreterException(s"Unknown statement type")
            }
            generateBatchStatement(x.batchType, queryOptions, builtStatements)

          case x: BoundStm =>
            generateBoundStatement(session, x, queryOptions, context)
          case x: DescribeCommandStatement => x
          case x: HelpCmd => x
          case x => throw new InterpreterException(s"Unknown statement type : $x")
       }

      val results: Seq[(Any,Any)] = for (statement <- statements)
        yield (enhancedSession.execute(statement),statement)

      if (results.nonEmpty) {
        results.last match {
          case(res: ResultSet, st: StatementT) =>
            buildResponseMessage((res, st), executionFormatter)
          case(output: String, _) => new InterpreterResult(Code.SUCCESS, output)
          case _ => throw new InterpreterException(s"Cannot parse result type : ${results.last}")
        }
      } else {
        new InterpreterResult(Code.SUCCESS, enhancedSession.displayNoResult)
      }
    } catch {
      case dex: DriverException =>
        logger.error(dex.getMessage, dex)
        new InterpreterResult(Code.ERROR, parseException(dex))

      case pex:ParsingException =>
        logger.error(pex.getMessage, pex)
        new InterpreterResult(Code.ERROR, pex.getMessage)

      case iex: InterpreterException =>
        logger.error(iex.getMessage, iex)
        new InterpreterResult(Code.ERROR, iex.getMessage)

      case ex: java.lang.Exception =>
        logger.error(ex.getMessage, ex)
        new InterpreterResult(Code.ERROR, parseException(ex))
    }
  }