in src/main/java/com/amazon/redshift/core/v3/CopyQueryExecutor.java [159:397]
CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
throws SQLException, IOException {
/*
* fixes issue #1592 where one thread closes the stream and another is reading it
*/
if (pgStream.isClosed()) {
throw new RedshiftException(GT.tr("RedshiftStream is closed",
op.getClass().getName()), RedshiftState.CONNECTION_DOES_NOT_EXIST);
}
/*
* This is a hack as we should not end up here, but sometimes do with large copy operations.
*/
if ( processingCopyResults.compareAndSet(false,true) == false ) {
if(RedshiftLogger.isEnable())
logger.log(LogLevel.INFO, "Ignoring request to process copy results, already processing");
return null;
}
// put this all in a try, finally block and reset the processingCopyResults in the finally clause
try {
boolean endReceiving = false;
SQLException error = null;
SQLException errors = null;
int len;
while (!endReceiving && (block || pgStream.hasMessagePending())) {
// There is a bug in the server's implementation of the copy
// protocol. It returns command complete immediately upon
// receiving the EOF marker in the binary protocol,
// potentially before we've issued CopyDone. When we are not
// blocking, we don't think we are done, so we hold off on
// processing command complete and any subsequent messages
// until we actually are done with the copy.
//
if (!block) {
int c = pgStream.peekChar();
if (c == 'C') {
// CommandComplete
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CommandStatus, Ignored until CopyDone");
break;
}
}
int c = pgStream.receiveChar();
switch (c) {
case 'A': // Asynchronous Notify
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE Asynchronous Notification while copying");
queryExecutor.receiveAsyncNotify();
break;
case 'N': // Notice Response
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE Notification while copying");
queryExecutor.addWarning(queryExecutor.receiveNoticeResponse());
break;
case 'C': // Command Complete
String status = queryExecutor.receiveCommandStatus();
try {
if (op == null) {
throw new RedshiftException(GT
.tr("Received CommandComplete ''{0}'' without an active copy operation", status),
RedshiftState.OBJECT_NOT_IN_STATE);
}
op.handleCommandStatus(status);
} catch (SQLException se) {
error = se;
}
block = true;
break;
case 'E': // ErrorMessage (expected response to CopyFail)
error = queryExecutor.receiveErrorResponse(false);
// We've received the error and we now expect to receive
// Ready for query, but we must block because it might still be
// on the wire and not here yet.
block = true;
break;
case 'G': // CopyInResponse
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CopyInResponse");
if (op != null) {
error = new RedshiftException(GT.tr("Got CopyInResponse from server during an active {0}",
op.getClass().getName()), RedshiftState.OBJECT_NOT_IN_STATE);
}
op = new CopyInImpl();
initCopy(op);
endReceiving = true;
break;
case 'H': // CopyOutResponse
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CopyOutResponse");
if (op != null) {
error = new RedshiftException(GT.tr("Got CopyOutResponse from server during an active {0}",
op.getClass().getName()), RedshiftState.OBJECT_NOT_IN_STATE);
}
op = new CopyOutImpl();
initCopy(op);
endReceiving = true;
break;
case 'W': // CopyBothResponse
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CopyBothResponse");
if (op != null) {
error = new RedshiftException(GT.tr("Got CopyBothResponse from server during an active {0}",
op.getClass().getName()), RedshiftState.OBJECT_NOT_IN_STATE);
}
op = new CopyDualImpl();
initCopy(op);
endReceiving = true;
break;
case 'd': // CopyData
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CopyData");
len = pgStream.receiveInteger4() - 4;
assert len > 0 : "Copy Data length must be greater than 4";
byte[] buf = pgStream.receive(len);
if (op == null) {
error = new RedshiftException(GT.tr("Got CopyData without an active copy operation"),
RedshiftState.OBJECT_NOT_IN_STATE);
} else if (!(op instanceof CopyOut)) {
error = new RedshiftException(
GT.tr("Unexpected copydata from server for {0}", op.getClass().getName()),
RedshiftState.COMMUNICATION_ERROR);
} else {
op.handleCopydata(buf);
}
endReceiving = true;
break;
case 'c': // CopyDone (expected after all copydata received)
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE CopyDone");
len = pgStream.receiveInteger4() - 4;
if (len > 0) {
pgStream.receive(len); // not in specification; should never appear
}
if (!(op instanceof CopyOut)) {
error = new RedshiftException("Got CopyDone while not copying from server",
RedshiftState.OBJECT_NOT_IN_STATE);
}
// keep receiving since we expect a CommandComplete
block = true;
break;
case 'S': // Parameter Status
try {
queryExecutor.receiveParameterStatus();
} catch (SQLException e) {
error = e;
endReceiving = true;
}
break;
case 'Z': // ReadyForQuery: After FE:CopyDone => BE:CommandComplete
queryExecutor.receiveRFQ();
if (queryExecutor.hasLock(op)) {
queryExecutor.unlock(op);
}
op = null;
endReceiving = true;
break;
// If the user sends a non-copy query, we've got to handle some additional things.
//
case 'T': // Row Description (response to Describe)
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE RowDescription (during copy ignored)");
queryExecutor.skipMessage();
break;
case 'D': // DataRow
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, " <=BE DataRow (during copy ignored)");
queryExecutor.skipMessage();
break;
default:
throw new IOException(
GT.tr("Unexpected packet type during copy: {0}", Integer.toString(c)));
}
// Collect errors into a neat chain for completeness
if (error != null) {
if (errors != null) {
error.setNextException(errors);
}
errors = error;
error = null;
}
}
if (errors != null) {
throw errors;
}
return op;
} finally {
/*
reset here in the finally block to make sure it really is cleared
*/
processingCopyResults.set(false);
}
}