override def receive()

in finagle-postgresql/src/main/scala/com/twitter/finagle/postgresql/machine/ExecuteMachine.scala [127:196]


  override def receive(
    state: State,
    msg: BackendMessage
  ): TransitionResult[State, Response.QueryResponse] =
    (state, msg) match {
      case (Binding, BindComplete) =>
        Transition(Describing, NoOp)

      case (Describing, NoData) =>
        Transition(ExecutingCommand, NoOp)
      case (ExecutingCommand, CommandComplete(tag)) =>
        Transition(Syncing(Some(Return(Response.Command(tag)))), Send(Sync))
      case (
            Executing(_),
            CommandComplete(CommandTag.AffectedRows(BackendMessage.CommandTag.Select, 0))) =>
        Transition(Syncing(Some(Return(Response.Empty))), Send(Sync))

      case (Describing, r: RowDescription) =>
        Transition(Executing(r), NoOp)

      case (Executing(_), EmptyQueryResponse) =>
        Transition(Syncing(Some(Return(Response.Empty))), Send(Sync))

      case (Executing(r), row: DataRow) =>
        val stream = StreamResult(r, new Pipe, Future.Done).append(row)

        stream.pipe.onClose.respond {
          case Throw(_: PgSqlServerError) =>
          // don't terminate the connection on expected exceptions
          case Return(StreamTermination.Discarded) =>
            interrupt(new ReaderDiscardedException)
          case Throw(reason) =>
            interrupt(reason)
          case Return(_) =>
          // noop
        }

        Transition(stream, Respond(Return(stream.resultSet(parameters))))

      case (r: StreamResult, dr: DataRow) =>
        Transition(r.append(dr), NoOp)
      case (r: StreamResult, PortalSuspended) =>
        r.lastWrite.liftToTry.unit before r.pipe.close()
        // TODO: the ReadyForQuery here is fake
        Complete(ReadyForQuery(InTx), None)

      case (r: StreamResult, _: CommandComplete) =>
        // TODO: handle discard() to client can cancel the stream
        r.lastWrite.liftToTry.unit before r.pipe.close()
        Transition(Syncing(None), Send(Sync))

      case (r: StreamResult, e: ErrorResponse) =>
        val exception = PgSqlServerError(e)
        r.pipe.fail(exception)
        Transition(Syncing(Some(Throw(exception))), Send(Sync))

      case (Syncing(response), r: ReadyForQuery) => Complete(r, response)

      case (_, e: ErrorResponse) =>
        Transition(Syncing(Some(Throw(PgSqlServerError(e)))), Send(Sync))

      case (state, _: NoticeResponse) => Transition(state, NoOp)

      case (r: StreamResult, msg) =>
        val err = PgSqlNoSuchTransition("ExecuteMachine", r, msg)
        r.pipe.fail(err)
        throw err

      case (state, msg) => throw PgSqlNoSuchTransition("ExecuteMachine", state, msg)
    }