in tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java [272:395]
public void addTable(final CatalogProtos.TableDescProto table) throws CatalogException {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet res = null;
try {
conn = getConnection();
conn.setAutoCommit(false);
String tableName = table.getId().toLowerCase();
String sql = String.format("INSERT INTO %s (%s, path, store_type) VALUES(?, ?, ?) ", TB_TABLES, C_TABLE_ID);
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
pstmt = conn.prepareStatement(sql);
pstmt.setString(1, tableName);
pstmt.setString(2, table.getPath());
pstmt.setString(3, table.getMeta().getStoreType().name());
pstmt.executeUpdate();
pstmt.close();
String tidSql = String.format("SELECT TID from %s WHERE %s = ?", TB_TABLES, C_TABLE_ID);
pstmt = conn.prepareStatement(tidSql);
pstmt.setString(1, tableName);
res = pstmt.executeQuery();
if (!res.next()) {
throw new CatalogException("ERROR: there is no tid matched to " + table.getId());
}
int tid = res.getInt("TID");
res.close();
pstmt.close();
String colSql = String.format("INSERT INTO %s (TID, %s, column_id, column_name, data_type, type_length)"
+ " VALUES(?, ?, ?, ?, ?, ?) ", TB_COLUMNS, C_TABLE_ID);
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
pstmt = conn.prepareStatement(colSql);
for(int i = 0; i < table.getSchema().getFieldsCount(); i++) {
ColumnProto col = table.getSchema().getFields(i);
pstmt.setInt(1, tid);
pstmt.setString(2, tableName);
pstmt.setInt(3, i);
pstmt.setString(4, CatalogUtil.extractSimpleName(col.getName()));
pstmt.setString(5, col.getDataType().getType().name());
pstmt.setInt(6, (col.getDataType().hasLength() ? col.getDataType().getLength() : 0));
pstmt.addBatch();
pstmt.clearParameters();
}
pstmt.executeBatch();
pstmt.close();
if(table.getMeta().hasParams()) {
String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);
if (LOG.isDebugEnabled()) {
LOG.debug(optSql);
}
pstmt = conn.prepareStatement(optSql);
for (CatalogProtos.KeyValueProto entry : table.getMeta().getParams().getKeyvalList()) {
pstmt.setString(1, tableName);
pstmt.setString(2, entry.getKey());
pstmt.setString(3, entry.getValue());
pstmt.addBatch();
pstmt.clearParameters();
}
pstmt.executeBatch();
pstmt.close();
}
if (table.hasStats()) {
String statSql =
String.format("INSERT INTO %s (%s, num_rows, num_bytes) VALUES(?, ?, ?)", TB_STATISTICS, C_TABLE_ID);
if (LOG.isDebugEnabled()) {
LOG.debug(statSql);
}
pstmt = conn.prepareStatement(statSql);
pstmt.setString(1, tableName);
pstmt.setLong(2, table.getStats().getNumRows());
pstmt.setLong(3, table.getStats().getNumBytes());
pstmt.executeUpdate();
pstmt.close();
}
if(table.hasPartition()) {
String partSql =
String.format("INSERT INTO %s (%s, partition_type, expression, expression_schema) VALUES(?, ?, ?, ?)",
TB_PARTITION_METHODS, C_TABLE_ID);
if (LOG.isDebugEnabled()) {
LOG.debug(partSql);
}
pstmt = conn.prepareStatement(partSql);
pstmt.setString(1, tableName);
pstmt.setString(2, table.getPartition().getPartitionType().name());
pstmt.setString(3, table.getPartition().getExpression());
pstmt.setBytes(4, table.getPartition().getExpressionSchema().toByteArray());
pstmt.executeUpdate();
}
// If there is no error, commit the changes.
conn.commit();
} catch (SQLException se) {
try {
// If there is any error, rollback the changes.
conn.rollback();
} catch (SQLException se2) {
}
throw new CatalogException(se);
} finally {
CatalogUtil.closeQuietly(conn, pstmt, res);
}
}