public void open()

in flink_sink_adbpg_datastream/src/main/java/Adb4PgTableSink.java [211:312]


	public void open(Configuration parameters) throws Exception {
		super.open(parameters);

		synchronized (Adb4PgTableSink.class) {
			dataSourceKey = url + userName + password + tableName;
			if (dataSourcePool.contains(dataSourceKey)) {
				dataSource = dataSourcePool.get(dataSourceKey);
			} else {
				dataSource = new DruidDataSource();
				dataSource.setUrl(url);
				dataSource.setUsername(userName);
				dataSource.setPassword(password);
				dataSource.setDriverClassName(driverClassName);
				dataSource.setMaxActive(connectionMaxActive);
				dataSource.setInitialSize(connectionInitialSize);
				dataSource.setMaxWait(maxWait);
				dataSource.setMinIdle(connectionMinIdle);
				dataSource.setTimeBetweenEvictionRunsMillis(2000);
				dataSource.setMinEvictableIdleTimeMillis(600000);
				dataSource.setMaxEvictableIdleTimeMillis(900000);
				dataSource.setValidationQuery("select 1");
				dataSource.setTestOnBorrow(false);
				dataSource.setTestWhileIdle(connectionTestWhileIdle);
				dataSource.setRemoveAbandoned(true);
				dataSource.setRemoveAbandonedTimeout(removeAbandonedTimeout);
				try {
					dataSource.init();
				} catch (SQLException e) {
					LOG.error("Init DataSource Or Get Connection Error!", e);
					throw new RuntimeException("cannot get connection for url: " + url +", userName: " + userName +", password: " + password, e);
				}
				dataSourcePool.put(dataSourceKey, dataSource);
			}
			if (primaryKeys == null || primaryKeys.isEmpty()) {
				existsPrimaryKeys = false;
				if (2 == this.writeMode) {
					throw new RuntimeException("primary key cannot be empty when setting write mode to 2:upsert.");
				}
			}
			else {
				existsPrimaryKeys = true;
				Joiner joinerOnComma = Joiner.on(",").useForNull("null");
				String[] primaryFieldNamesStr = new String[primaryKeys.size()];
				String[] nonPrimaryFieldNamesStr = new String[schema.getLength() - primaryKeys.size()];
				String[] primaryFieldNamesStrCaseSensitive = new String[primaryKeys.size()];
				String[] nonPrimaryFieldNamesStrCaseSensitive = new String[schema.getLength() - primaryKeys.size()];
				String[] excludedNonPrimaryFieldNamesStr = new String[schema.getLength() - primaryKeys.size()];
				String[] excludedNonPrimaryFieldNamesStrCaseSensitive = new String[schema.getLength() - primaryKeys.size()];
				int primaryIndex = 0;
				int excludedIndex = 0;
				for (int i = 0; i < schema.getLength(); i++) {
					String fileName = schema.getFieldNames().get(i);
					if (primaryKeys.contains(fileName)) {
						primaryFieldNamesStr[primaryIndex] = fileName;
						primaryFieldNamesStrCaseSensitive[primaryIndex++] = "\"" + fileName + "\"";
					}
					else {
						nonPrimaryFieldNamesStr[excludedIndex] = fileName;
						nonPrimaryFieldNamesStrCaseSensitive[excludedIndex] = "\"" + fileName + "\"";
						excludedNonPrimaryFieldNamesStr[excludedIndex] = "excluded." + fileName;
						excludedNonPrimaryFieldNamesStrCaseSensitive[excludedIndex++] = "excluded.\"" + fileName + "\"";
					}
				}
				this.primaryFieldNames = joinerOnComma.join(primaryFieldNamesStr);
				this.nonPrimaryFieldNames = joinerOnComma.join(nonPrimaryFieldNamesStr);
				this.primaryFieldNamesCaseSensitive = joinerOnComma.join(primaryFieldNamesStrCaseSensitive);
				this.nonPrimaryFieldNamesCaseSensitive = joinerOnComma.join(nonPrimaryFieldNamesStrCaseSensitive);
				this.excludedNonPrimaryFieldNames = joinerOnComma.join(excludedNonPrimaryFieldNamesStr);
				this.excludedNonPrimaryFieldNamesCaseSensitive = joinerOnComma.join(excludedNonPrimaryFieldNamesStrCaseSensitive);
			}
			executorService = new ScheduledThreadPoolExecutor(1,
					new BasicThreadFactory.Builder().namingPattern("adbpg-flusher-%d").daemon(true).build());
			executorService.scheduleAtFixedRate(new Runnable() {
				@Override
				public void run() {
					try {
						if(System.currentTimeMillis() - lastWriteTime >= batchWriteTimeout){
							sync();
						}
					} catch (Exception e) {
						LOG.error("flush buffer to ADBPG failed", e);
					}
				}
			}, batchWriteTimeout, batchWriteTimeout, TimeUnit.MILLISECONDS);
		}
		LOG.info("connector created using url=" + url + ", " +
				"tableName=" + tableName + ", " +
				"userName=" + userName + ", " +
				"password=" + password + ", " +
				"maxRetries=" + maxRetryTime + ", " +
				"batchSize=" + batchSize + ", " +
				"connectionMaxActive=" + connectionMaxActive + ", " +
				"batchWriteTimeoutMs=" + batchWriteTimeout + ", " +
				"conflictMode=" + conflictMode + ", " +
				"timeZone=" + timeZone + ", " +
				"useCopy=" + useCopy + ", " +
				"targetSchema=" + targetSchema + ", " +
				"exceptionMode=" + exceptionMode + ", " +
				"reserveMs=" + reserveMs + ", " +
				"caseSensitive=" + caseSensitive +", " +
				"writeMode=" + writeMode);
	}