private void executeCopy()

in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [573:621]


	private void executeCopy(InputStream inputStream) throws SQLException {
		if (inputStream == null){
			return;
		}
		inputStream.mark(0);
		int retryTime = 0;
		while (retryTime++ < maxRetryTime) {
			connect();
			try {
				inputStream.reset();
				inputStream.mark(0);
				CopyManager manager = new CopyManager((BaseConnection) (dataSource.getConnection().getConnection()));
				if (caseSensitive) {
					manager.copyIn("COPY \"" + targetSchema + "\".\"" + tableName + "\"( " +fieldNamesCaseSensitive +" )" + " from STDIN", inputStream);
				}
				else {
					manager.copyIn("COPY " + targetSchema + "." + tableName + "( " +fieldNames +" )" + " from STDIN", inputStream);
				}
				break;
			} catch (SQLException e) {
				closeConnection();
				if ((e.getMessage() != null
						&& e.getMessage().indexOf("duplicate key") != -1
						&& e.getMessage().indexOf("violates unique constraint") != -1)
						|| retryTime >= maxRetryTime - 1) {
					throw e;
				}
				try {
					Thread.sleep(retryWaitTime);
				} catch (Exception e1) {
					LOG.error("Thread sleep exception in AdbpgOutputFormat class", e1);
				}
			}catch (IOException e) {
				closeConnection();
				if ((e.getMessage() != null
						&& e.getMessage().indexOf("duplicate key") != -1
						&& e.getMessage().indexOf("violates unique constraint") != -1)
						|| retryTime >= maxRetryTime - 1) {
					throw new SQLException("cannot execute copy.");
				}
				try {
					Thread.sleep(retryWaitTime);
				} catch (Exception e1) {
					LOG.error("Thread sleep exception in AdbpgOutputFormat class", e1);
				}
			}
		}
		closeConnection();
	}