in src/main/java/com/amazonaws/services/glue/catalog/HiveGlueCatalogSyncAgent.java [203:253]
public HiveGlueCatalogSyncAgent(final Configuration conf) throws Exception {
super(conf);
this.config = conf;
String noopSleepDuration = this.config.get("no-event-sleep-duration");
if (noopSleepDuration == null) {
this.noEventSleepDuration = 1000;
} else {
this.noEventSleepDuration = new Integer(noopSleepDuration).intValue();
}
String reconnectSleepDuration = conf.get("reconnect-failed-sleep-duration");
if (reconnectSleepDuration == null) {
this.reconnectSleepDuration = 1000;
} else {
this.reconnectSleepDuration = new Integer(noopSleepDuration).intValue();
}
this.info = new Properties();
this.info.put("log_path", "/tmp/jdbc.log");
this.info.put("log_level", "ERROR");
this.info.put("s3_staging_dir", config.get(GLUE_CATALOG_S3_STAGING_DIR));
dropTableIfExists = config.getBoolean(GLUE_CATALOG_DROP_TABLE_IF_EXISTS, false);
createMissingDB = config.getBoolean(GLUE_CATALOG_CREATE_MISSING_DB, true);
suppressAllDropEvents = config.getBoolean(SUPPRESS_ALL_DROP_EVENTS, false);
this.athenaURL = conf.get(ATHENA_JDBC_URL, DEFAULT_ATHENA_CONNECTION_URL);
if (config.get(GLUE_CATALOG_USER_KEY) != null) {
info.put("user", config.get(GLUE_CATALOG_USER_KEY));
info.put("password", config.get(GLUE_CATALOG_USER_SECRET));
} else {
this.info.put("aws_credentials_provider_class",
com.amazonaws.auth.InstanceProfileCredentialsProvider.class.getName());
}
ddlQueue = new ConcurrentLinkedQueue<>();
configureAthenaConnection();
// start the queue processor thread
AthenaQueueProcessor athenaQueueProcessor = new AthenaQueueProcessor(this.config);
queueProcessor = new Thread(athenaQueueProcessor, "GlueSyncThread");
queueProcessor.start();
// add a shutdown hook to close the connections
Runtime.getRuntime()
.addShutdownHook(new Thread(new SyncAgentShutdownRoutine(athenaQueueProcessor), "Shutdown-thread"));
LOG.info(String.format("%s online, connected to %s", this.getClass().getCanonicalName(), this.athenaURL));
}