in server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java [193:362]
public void afterPropertiesSet() {
// Required parameter. Can be auto-overwritten by user options
String jdbcDriver = configuration.get(JDBC_DRIVER_PROPERTY_NAME);
assertMandatoryParameter(jdbcDriver, JDBC_DRIVER_PROPERTY_NAME, JDBC_DRIVER_OPTION_NAME);
try {
LOG.debug("JDBC driver: '{}'", jdbcDriver);
Class.forName(jdbcDriver);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
// Required parameter. Can be auto-overwritten by user options
jdbcUrl = configuration.get(JDBC_URL_PROPERTY_NAME);
assertMandatoryParameter(jdbcUrl, JDBC_URL_PROPERTY_NAME, JDBC_URL_OPTION_NAME);
// Required metadata
String dataSource = context.getDataSource();
if (StringUtils.isBlank(dataSource)) {
throw new IllegalArgumentException("Data source must be provided");
}
// Determine if the datasource is a table name or a query name
if (dataSource.startsWith(QUERY_NAME_PREFIX)) {
queryName = dataSource.substring(QUERY_NAME_PREFIX_LENGTH);
if (StringUtils.isBlank(queryName)) {
throw new IllegalArgumentException(String.format("Query name is not provided in data source [%s]", dataSource));
}
LOG.debug("Query name is {}", queryName);
} else {
tableName = dataSource;
LOG.debug("Table name is {}", tableName);
}
// Required metadata
columns = context.getTupleDescription();
// Optional parameters
batchSizeIsSetByUser = configuration.get(JDBC_STATEMENT_BATCH_SIZE_PROPERTY_NAME) != null;
if (context.getRequestType() == RequestContext.RequestType.WRITE_BRIDGE) {
batchSize = configuration.getInt(JDBC_STATEMENT_BATCH_SIZE_PROPERTY_NAME, DEFAULT_BATCH_SIZE);
if (batchSize == 0) {
batchSize = 1; // if user set to 0, it is the same as batchSize of 1
} else if (batchSize < 0) {
throw new IllegalArgumentException(String.format(
"Property %s has incorrect value %s : must be a non-negative integer", JDBC_STATEMENT_BATCH_SIZE_PROPERTY_NAME, batchSize));
}
}
// determine fetchSize for read operations, with different default values for MySQL driver and all others
int defaultFetchSize = jdbcDriver.startsWith(MYSQL_DRIVER_PREFIX) ? DEFAULT_MYSQL_FETCH_SIZE : DEFAULT_FETCH_SIZE;
fetchSize = configuration.getInt(JDBC_STATEMENT_FETCH_SIZE_PROPERTY_NAME, defaultFetchSize);
LOG.debug("Will be using fetchSize {}", fetchSize);
poolSize = context.getOption("POOL_SIZE", DEFAULT_POOL_SIZE);
String queryTimeoutString = configuration.get(JDBC_STATEMENT_QUERY_TIMEOUT_PROPERTY_NAME);
if (StringUtils.isNotBlank(queryTimeoutString)) {
try {
queryTimeout = Integer.parseUnsignedInt(queryTimeoutString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(String.format(
"Property %s has incorrect value %s : must be a non-negative integer",
JDBC_STATEMENT_QUERY_TIMEOUT_PROPERTY_NAME, queryTimeoutString), e);
}
}
// Optional parameter. The default value is null
String quoteColumnsRaw = context.getOption("QUOTE_COLUMNS");
if (quoteColumnsRaw != null) {
quoteColumns = Boolean.parseBoolean(quoteColumnsRaw);
}
// Optional parameter. The default value is empty map
sessionConfiguration.putAll(getPropsWithPrefix(configuration, JDBC_SESSION_PROPERTY_PREFIX));
// Check forbidden symbols
// Note: PreparedStatement enables us to skip this check: its values are distinct from its SQL code
// However, SET queries cannot be executed this way. This is why we do this check
if (sessionConfiguration.entrySet().stream()
.anyMatch(
entry ->
StringUtils.containsAny(
entry.getKey(), FORBIDDEN_SESSION_PROPERTY_CHARACTERS
) ||
StringUtils.containsAny(
entry.getValue(), FORBIDDEN_SESSION_PROPERTY_CHARACTERS
)
)
) {
throw new IllegalArgumentException("Some session configuration parameter contains forbidden characters");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Session configuration: {}",
sessionConfiguration.entrySet().stream()
.map(entry -> "'" + entry.getKey() + "'='" + entry.getValue() + "'")
.collect(Collectors.joining(", "))
);
}
// Optional parameter. The default value is empty map
connectionConfiguration.putAll(getPropsWithPrefix(configuration, JDBC_CONNECTION_PROPERTY_PREFIX));
// Optional parameter. The default value depends on the database
String transactionIsolationString = configuration.get(JDBC_CONNECTION_TRANSACTION_ISOLATION, "NOT_PROVIDED");
transactionIsolation = TransactionIsolation.typeOf(transactionIsolationString);
// Set optional user parameter, taking into account impersonation setting for the server.
String jdbcUser = configuration.get(JDBC_USER_PROPERTY_NAME);
boolean impersonationEnabledForServer = configuration.getBoolean(CONFIG_KEY_SERVICE_USER_IMPERSONATION, false);
LOG.debug("JDBC impersonation is {}enabled for server {}", impersonationEnabledForServer ? "" : "not ", context.getServerName());
if (impersonationEnabledForServer) {
if (Utilities.isSecurityEnabled(configuration) && StringUtils.startsWith(jdbcUrl, HIVE_URL_PREFIX)) {
// secure impersonation for Hive JDBC driver requires setting URL fragment that cannot be overwritten by properties
String updatedJdbcUrl = HiveJdbcUtils.updateImpersonationPropertyInHiveJdbcUrl(jdbcUrl, context.getUser());
LOG.debug("Replaced JDBC URL {} with {}", jdbcUrl, updatedJdbcUrl);
jdbcUrl = updatedJdbcUrl;
} else {
// the jdbcUser is the GPDB user
jdbcUser = context.getUser();
}
}
if (jdbcUser != null) {
LOG.debug("Effective JDBC user {}", jdbcUser);
connectionConfiguration.setProperty("user", jdbcUser);
} else {
LOG.debug("JDBC user has not been set");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Connection configuration: {}",
connectionConfiguration.entrySet().stream()
.map(entry -> "'" + entry.getKey() + "'='" + entry.getValue() + "'")
.collect(Collectors.joining(", "))
);
}
// This must be the last parameter parsed, as we output connectionConfiguration earlier
// Optional parameter. By default, corresponding connectionConfiguration property is not set
if (jdbcUser != null) {
String jdbcPassword = configuration.get(JDBC_PASSWORD_PROPERTY_NAME);
if (jdbcPassword != null) {
LOG.debug("Connection password: {}", ConnectionManager.maskPassword(jdbcPassword));
connectionConfiguration.setProperty("password", jdbcPassword);
}
}
// connection pool is optional, enabled by default
isConnectionPoolUsed = configuration.getBoolean(JDBC_CONNECTION_POOL_ENABLED_PROPERTY_NAME, true);
LOG.debug("Connection pool is {}enabled", isConnectionPoolUsed ? "" : "not ");
if (isConnectionPoolUsed) {
poolConfiguration = new Properties();
// for PXF upgrades where jdbc-site template has not been updated, make sure there're sensible defaults
poolConfiguration.setProperty("maximumPoolSize", "15");
poolConfiguration.setProperty("connectionTimeout", "30000");
poolConfiguration.setProperty("idleTimeout", "30000");
poolConfiguration.setProperty("minimumIdle", "0");
// apply values read from the template
poolConfiguration.putAll(getPropsWithPrefix(configuration, JDBC_CONNECTION_POOL_PROPERTY_PREFIX));
// packaged Hive JDBC Driver does not support connection.isValid() method, so we need to force set
// connectionTestQuery parameter in this case, unless already set by the user
if (jdbcUrl.startsWith(HIVE_URL_PREFIX) && HIVE_DEFAULT_DRIVER_CLASS.equals(jdbcDriver) && poolConfiguration.getProperty("connectionTestQuery") == null) {
poolConfiguration.setProperty("connectionTestQuery", "SELECT 1");
}
// get the qualifier for connection pool, if configured. Might be used when connection session authorization is employed
// to switch effective user once connection is established
poolQualifier = configuration.get(JDBC_POOL_QUALIFIER_PROPERTY_NAME);
}
}