in finagle-postgresql/src/main/scala/com/twitter/finagle/postgresql/machine/ExecuteMachine.scala [127:196]
override def receive(
state: State,
msg: BackendMessage
): TransitionResult[State, Response.QueryResponse] =
(state, msg) match {
case (Binding, BindComplete) =>
Transition(Describing, NoOp)
case (Describing, NoData) =>
Transition(ExecutingCommand, NoOp)
case (ExecutingCommand, CommandComplete(tag)) =>
Transition(Syncing(Some(Return(Response.Command(tag)))), Send(Sync))
case (
Executing(_),
CommandComplete(CommandTag.AffectedRows(BackendMessage.CommandTag.Select, 0))) =>
Transition(Syncing(Some(Return(Response.Empty))), Send(Sync))
case (Describing, r: RowDescription) =>
Transition(Executing(r), NoOp)
case (Executing(_), EmptyQueryResponse) =>
Transition(Syncing(Some(Return(Response.Empty))), Send(Sync))
case (Executing(r), row: DataRow) =>
val stream = StreamResult(r, new Pipe, Future.Done).append(row)
stream.pipe.onClose.respond {
case Throw(_: PgSqlServerError) =>
// don't terminate the connection on expected exceptions
case Return(StreamTermination.Discarded) =>
interrupt(new ReaderDiscardedException)
case Throw(reason) =>
interrupt(reason)
case Return(_) =>
// noop
}
Transition(stream, Respond(Return(stream.resultSet(parameters))))
case (r: StreamResult, dr: DataRow) =>
Transition(r.append(dr), NoOp)
case (r: StreamResult, PortalSuspended) =>
r.lastWrite.liftToTry.unit before r.pipe.close()
// TODO: the ReadyForQuery here is fake
Complete(ReadyForQuery(InTx), None)
case (r: StreamResult, _: CommandComplete) =>
// TODO: handle discard() to client can cancel the stream
r.lastWrite.liftToTry.unit before r.pipe.close()
Transition(Syncing(None), Send(Sync))
case (r: StreamResult, e: ErrorResponse) =>
val exception = PgSqlServerError(e)
r.pipe.fail(exception)
Transition(Syncing(Some(Throw(exception))), Send(Sync))
case (Syncing(response), r: ReadyForQuery) => Complete(r, response)
case (_, e: ErrorResponse) =>
Transition(Syncing(Some(Throw(PgSqlServerError(e)))), Send(Sync))
case (state, _: NoticeResponse) => Transition(state, NoOp)
case (r: StreamResult, msg) =>
val err = PgSqlNoSuchTransition("ExecuteMachine", r, msg)
r.pipe.fail(err)
throw err
case (state, msg) => throw PgSqlNoSuchTransition("ExecuteMachine", state, msg)
}