public void run()

in src/main/java/com/amazonaws/services/glue/catalog/HiveGlueCatalogSyncAgent.java [111:195]


		public void run() {
			// run forever or until stop is called, and continue running until the queue is
			// empty when stop is called
			while (true) {
				if (!ddlQueue.isEmpty()) {
					String query = ddlQueue.poll();

					LOG.info("Working on " + query);
					// Exception logic: if it's a network issue keep retrying. Anything else log to
					// CWL and move on.
					boolean completed = false;
					while (!completed) {
						try {
							Statement athenaStmt = athenaConnection.createStatement();
							cwlr.sendToCWL("Trying to execute: " + query);
							athenaStmt.execute(query);
							athenaStmt.close();
							completed = true;
						} catch (SQLException e) {
							if (e instanceof SQLRecoverableException || e instanceof SQLTimeoutException) {
								try {
									configureAthenaConnection();
								} catch (SQLException e1) {
									// this will probably be because we can't open the connection
									try {
										Thread.sleep(reconnectSleepDuration);
									} catch (InterruptedException e2) {
										e2.printStackTrace();
									}
								}
							} else {
								// Athena's JDBC Driver just throws a generic SQLException
								// Only way to identify exception type is through string parsing :O=
								if (e.getMessage().contains("AlreadyExistsException") && dropTableIfExists) {
									Matcher matcher = PATTERN.matcher(query);
									matcher.find();
									String tableName = matcher.group(1);
									try {
										cwlr.sendToCWL("Dropping table " + tableName);
										Statement athenaStmt = athenaConnection.createStatement();
										athenaStmt.execute("drop table " + tableName);
										cwlr.sendToCWL("Creating table " + tableName + " after dropping ");
										athenaStmt.execute(query);
										athenaStmt.close();
										completed = true;
									} catch (Exception e2) {
										cwlr.sendToCWL("Unable to drop and recreate  " + tableName);
										cwlr.sendToCWL("ERROR: " + e.getMessage());

									}
								} else if (e.getMessage().contains("Database does not exist:") && createMissingDB) {
									try {
										String dbName = e.getMessage().split(":")[3].trim();
										cwlr.sendToCWL("Trying to create database  " + dbName);
										Statement athenaStmt = athenaConnection.createStatement();
										athenaStmt.execute("Create database if not exists " + dbName);
										cwlr.sendToCWL("Retrying table creation:" + query);
										athenaStmt.execute(query);
										athenaStmt.close();
										completed = true;
									} catch (Throwable e2) {
										LOG.info("ERROR: " + e.getMessage());
										LOG.info("DB doesn't exist for: " + query);
									}
								} else {
									LOG.info("Unable to complete query: " + query);
									cwlr.sendToCWL("ERROR: " + e.getMessage());
									completed = true;
								}
							}
						}
					}

				} else {
					// put the thread to sleep for a configured duration
					try {
						LOG.debug(String.format("DDL Queue is empty. Sleeping for %s, queue state is %s",
								noEventSleepDuration, ddlQueue.size()));
						Thread.sleep(noEventSleepDuration);
					} catch (InterruptedException e) {
						LOG.error(e.getMessage());
					}
				}
			}
		}