in holo-client/src/main/java/com/alibaba/hologres/client/impl/handler/CopyActionHandler.java [109:219]
public void handle(final CopyAction action) {
try {
action.getFuture().complete((Long) connectionHolder.retryExecute((conn) -> {
PgConnection pgConn = conn.unwrap(PgConnection.class);
CopyManager manager = new CopyManager(pgConn);
TableSchema schema = action.getSchema();
OutputStream os = action.getOs();
try {
long ret = -1;
switch (action.getMode()) {
case OUT:
try {
StringBuilder sb = new StringBuilder();
sb.append("COPY (select ");
boolean first = true;
for (Column column : schema.getColumnSchema()) {
if (!first) {
sb.append(",");
}
first = false;
sb.append(IdentifierUtil.quoteIdentifier(column.getName(), true));
}
sb.append(" from ").append(schema.getTableNameObj().getFullName());
if (action.getStartShardId() > -1 && action.getEndShardId() > -1) {
sb.append(" where hg_shard_id>=").append(action.getStartShardId()).append(" and hg_shard_id<").append(action.getEndShardId());
}
sb.append(") TO STDOUT DELIMITER ',' ESCAPE '\\' CSV QUOTE '\"' NULL AS 'N'");
String sql = sb.toString();
LOGGER.info("copy sql:{}", sql);
os = action.getOs();
CopyOut copyOut = manager.copyOut(sql);
CopyContext copyContext = new CopyContext(conn, copyOut);
action.getReadyToStart().complete(new CopyContext(conn, copyOut));
long rowCount = doCopyOut(copyContext, os);
if (os instanceof InternalPipedOutputStream) {
os.close();
}
ret = rowCount;
} catch (Exception e) {
action.getReadyToStart().completeExceptionally(e);
throw e;
}
break;
case IN: {
boolean hasException = false;
try {
if (action.getStartShardId() > -1 && action.getEndShardId() > -1) {
StringBuilder sql = new StringBuilder("set hg_experimental_target_shard_list='");
boolean first = true;
for (int i = action.getStartShardId(); i < action.getEndShardId(); ++i) {
if (!first) {
sql.append(",");
}
first = false;
sql.append(i);
}
sql.append("'");
try (Statement stat = pgConn.createStatement()) {
stat.execute(sql.toString());
} catch (SQLException e) {
LOGGER.error("", e);
}
}
StringBuilder sb = new StringBuilder();
sb.append("COPY ").append(schema.getTableNameObj().getFullName());
sb.append(" FROM STDIN DELIMITER ',' ESCAPE '\\' CSV QUOTE '\"' NULL AS 'N'");
String sql = sb.toString();
LOGGER.info("copy sql:{}", sql);
CopyIn copyIn = manager.copyIn(sql);
CopyContext copyContext = new CopyContext(conn, copyIn);
action.getReadyToStart().complete(copyContext);
ret = doCopyIn(copyContext, action.getIs(), action.getBufferSize() > -1 ? action.getBufferSize() : config.getCopyInBufferSize());
} catch (Exception e) {
hasException = true;
action.getReadyToStart().completeExceptionally(e);
throw e;
} finally {
if (action.getStartShardId() > -1 && action.getEndShardId() > -1) {
try (Statement stat = pgConn.createStatement()) {
stat.execute("reset hg_experimental_target_shard_list");
} catch (SQLException e) {
if (hasException) {
LOGGER.error("reset hg_experimental_target_shard_list failed", e);
} else {
throw e;
}
}
}
}
}
break;
default:
throw new SQLException("copy but InputStream and OutputStream both null");
}
return ret;
} catch (Exception e) {
if (os instanceof InternalPipedOutputStream) {
try {
os.close();
} catch (IOException ignore) {
}
}
throw new SQLException(e);
}
}, 1));
} catch (HoloClientException e) {
action.getFuture().completeExceptionally(e);
}
}