public void handle()

in holo-client/src/main/java/com/alibaba/hologres/client/impl/handler/CopyActionHandler.java [109:219]


	public void handle(final CopyAction action) {
		try {
			action.getFuture().complete((Long) connectionHolder.retryExecute((conn) -> {
				PgConnection pgConn = conn.unwrap(PgConnection.class);
				CopyManager manager = new CopyManager(pgConn);
				TableSchema schema = action.getSchema();
				OutputStream os = action.getOs();
				try {
					long ret = -1;
					switch (action.getMode()) {
						case OUT:
							try {
								StringBuilder sb = new StringBuilder();
								sb.append("COPY (select ");
								boolean first = true;
								for (Column column : schema.getColumnSchema()) {
									if (!first) {
										sb.append(",");
									}
									first = false;
									sb.append(IdentifierUtil.quoteIdentifier(column.getName(), true));
								}
								sb.append(" from ").append(schema.getTableNameObj().getFullName());
								if (action.getStartShardId() > -1 && action.getEndShardId() > -1) {
									sb.append(" where hg_shard_id>=").append(action.getStartShardId()).append(" and hg_shard_id<").append(action.getEndShardId());
								}
								sb.append(") TO STDOUT DELIMITER ',' ESCAPE '\\' CSV QUOTE '\"' NULL AS 'N'");
								String sql = sb.toString();
								LOGGER.info("copy sql:{}", sql);
								os = action.getOs();
								CopyOut copyOut = manager.copyOut(sql);
								CopyContext copyContext = new CopyContext(conn, copyOut);
								action.getReadyToStart().complete(new CopyContext(conn, copyOut));
								long rowCount = doCopyOut(copyContext, os);

								if (os instanceof InternalPipedOutputStream) {
									os.close();
								}
								ret = rowCount;
							} catch (Exception e) {
								action.getReadyToStart().completeExceptionally(e);
								throw e;
							}
							break;
						case IN: {
							boolean hasException = false;
							try {
								if (action.getStartShardId() > -1 && action.getEndShardId() > -1) {
									StringBuilder sql = new StringBuilder("set hg_experimental_target_shard_list='");
									boolean first = true;
									for (int i = action.getStartShardId(); i < action.getEndShardId(); ++i) {
										if (!first) {
											sql.append(",");
										}
										first = false;
										sql.append(i);
									}
									sql.append("'");
									try (Statement stat = pgConn.createStatement()) {
										stat.execute(sql.toString());
									} catch (SQLException e) {
										LOGGER.error("", e);
									}
								}

								StringBuilder sb = new StringBuilder();
								sb.append("COPY ").append(schema.getTableNameObj().getFullName());
								sb.append(" FROM STDIN DELIMITER ',' ESCAPE '\\' CSV QUOTE '\"' NULL AS 'N'");
								String sql = sb.toString();
								LOGGER.info("copy sql:{}", sql);
								CopyIn copyIn = manager.copyIn(sql);
								CopyContext copyContext = new CopyContext(conn, copyIn);
								action.getReadyToStart().complete(copyContext);
								ret = doCopyIn(copyContext, action.getIs(), action.getBufferSize() > -1 ? action.getBufferSize() : config.getCopyInBufferSize());
							} catch (Exception e) {
								hasException = true;
								action.getReadyToStart().completeExceptionally(e);
								throw e;
							} finally {
								if (action.getStartShardId() > -1 && action.getEndShardId() > -1) {
									try (Statement stat = pgConn.createStatement()) {
										stat.execute("reset hg_experimental_target_shard_list");
									} catch (SQLException e) {
										if (hasException) {
											LOGGER.error("reset hg_experimental_target_shard_list failed", e);
										} else {
											throw e;
										}
									}
								}
							}
						}
						break;
						default:
							throw new SQLException("copy but InputStream and OutputStream both null");
					}
					return ret;
				} catch (Exception e) {
					if (os instanceof InternalPipedOutputStream) {
						try {
							os.close();
						} catch (IOException ignore) {
						}
					}
					throw new SQLException(e);
				}
			}, 1));
		} catch (HoloClientException e) {
			action.getFuture().completeExceptionally(e);
		}
	}