in src/main/java/com/amazonaws/services/glue/catalog/HiveGlueCatalogSyncAgent.java [111:195]
public void run() {
// run forever or until stop is called, and continue running until the queue is
// empty when stop is called
while (true) {
if (!ddlQueue.isEmpty()) {
String query = ddlQueue.poll();
LOG.info("Working on " + query);
// Exception logic: if it's a network issue keep retrying. Anything else log to
// CWL and move on.
boolean completed = false;
while (!completed) {
try {
Statement athenaStmt = athenaConnection.createStatement();
cwlr.sendToCWL("Trying to execute: " + query);
athenaStmt.execute(query);
athenaStmt.close();
completed = true;
} catch (SQLException e) {
if (e instanceof SQLRecoverableException || e instanceof SQLTimeoutException) {
try {
configureAthenaConnection();
} catch (SQLException e1) {
// this will probably be because we can't open the connection
try {
Thread.sleep(reconnectSleepDuration);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
}
} else {
// Athena's JDBC Driver just throws a generic SQLException
// Only way to identify exception type is through string parsing :O=
if (e.getMessage().contains("AlreadyExistsException") && dropTableIfExists) {
Matcher matcher = PATTERN.matcher(query);
matcher.find();
String tableName = matcher.group(1);
try {
cwlr.sendToCWL("Dropping table " + tableName);
Statement athenaStmt = athenaConnection.createStatement();
athenaStmt.execute("drop table " + tableName);
cwlr.sendToCWL("Creating table " + tableName + " after dropping ");
athenaStmt.execute(query);
athenaStmt.close();
completed = true;
} catch (Exception e2) {
cwlr.sendToCWL("Unable to drop and recreate " + tableName);
cwlr.sendToCWL("ERROR: " + e.getMessage());
}
} else if (e.getMessage().contains("Database does not exist:") && createMissingDB) {
try {
String dbName = e.getMessage().split(":")[3].trim();
cwlr.sendToCWL("Trying to create database " + dbName);
Statement athenaStmt = athenaConnection.createStatement();
athenaStmt.execute("Create database if not exists " + dbName);
cwlr.sendToCWL("Retrying table creation:" + query);
athenaStmt.execute(query);
athenaStmt.close();
completed = true;
} catch (Throwable e2) {
LOG.info("ERROR: " + e.getMessage());
LOG.info("DB doesn't exist for: " + query);
}
} else {
LOG.info("Unable to complete query: " + query);
cwlr.sendToCWL("ERROR: " + e.getMessage());
completed = true;
}
}
}
}
} else {
// put the thread to sleep for a configured duration
try {
LOG.debug(String.format("DDL Queue is empty. Sleeping for %s, queue state is %s",
noEventSleepDuration, ddlQueue.size()));
Thread.sleep(noEventSleepDuration);
} catch (InterruptedException e) {
LOG.error(e.getMessage());
}
}
}
}