in src/main/java/com/google/cloud/spanner/pgadapter/wireprotocol/MessageReader.java [49:132]
public ControlMessage create(ConnectionHandler connection) throws Exception {
boolean validMessage = true;
char nextMsg = readNextMsgIdentifier(connection);
try {
if (connection.getStatus() == ConnectionStatus.COPY_IN) {
switch (nextMsg) {
case CopyDoneMessage.IDENTIFIER:
return new CopyDoneMessage(connection);
case CopyDataMessage.IDENTIFIER:
return new CopyDataMessage(connection);
case CopyFailMessage.IDENTIFIER:
return new CopyFailMessage(connection);
case SyncMessage.IDENTIFIER:
case FlushMessage.IDENTIFIER:
// Skip sync/flush in COPY_IN. This is consistent with real PostgreSQL which also does
// this to accommodate clients that do not check what type of statement they sent in an
// ExecuteMessage, and instead always blindly send a flush/sync after each execute.
return SkipMessage.createForValidStream(connection);
default:
// Skip other unexpected messages and throw an exception to fail the copy operation.
validMessage = false;
SkipMessage.createForInvalidStream(connection);
throw new IllegalStateException(
String.format(
"Expected CopyData ('d'), CopyDone ('c') or CopyFail ('f') messages, got: '%c'",
nextMsg));
}
} else {
switch (nextMsg) {
case QueryMessage.IDENTIFIER:
return new QueryMessage(connection);
case ParseMessage.IDENTIFIER:
return new ParseMessage(connection);
case BindMessage.IDENTIFIER:
return new BindMessage(connection);
case DescribeMessage.IDENTIFIER:
return new DescribeMessage(connection);
case ExecuteMessage.IDENTIFIER:
return new ExecuteMessage(connection);
case CloseMessage.IDENTIFIER:
return new CloseMessage(connection);
case TerminateMessage.IDENTIFIER:
return new TerminateMessage(connection);
case FunctionCallMessage.IDENTIFIER:
return new FunctionCallMessage(connection);
case FlushMessage.IDENTIFIER:
return new FlushMessage(connection);
case SyncMessage.IDENTIFIER:
return new SyncMessage(connection);
case CopyDoneMessage.IDENTIFIER:
case CopyDataMessage.IDENTIFIER:
case CopyFailMessage.IDENTIFIER:
// Silently skip COPY messages in non-COPY mode. This is consistent with the PG wire
// protocol. If we continue to receive COPY messages while in non-COPY mode, we'll
// terminate the connection to prevent the server from being flooded with invalid
// messages.
validMessage = false;
// Note: The stream itself is still valid as we received a message that we recognized.
return SkipMessage.createForValidStream(connection);
default:
throw new IllegalStateException(String.format("Unknown message: %c", nextMsg));
}
}
} finally {
if (validMessage) {
connection.clearInvalidMessageCount();
} else {
connection.increaseInvalidMessageCount();
if (connection.getInvalidMessageCount() > MAX_INVALID_MESSAGE_COUNT) {
new ErrorResponse(
connection,
PGException.newBuilder(
String.format(
"Received %d invalid/unexpected messages. Last received message: '%c'",
connection.getInvalidMessageCount(), nextMsg))
.setSQLState(SQLState.ProtocolViolation)
.setSeverity(Severity.FATAL)
.build())
.send();
connection.setStatus(ConnectionStatus.TERMINATED);
}
}
}
}