public void addTable()

in tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java [272:395]


  public void addTable(final CatalogProtos.TableDescProto table) throws CatalogException {
    Connection conn = null;
    PreparedStatement pstmt = null;
    ResultSet res = null;

    try {
      conn = getConnection();
      conn.setAutoCommit(false);
      String tableName = table.getId().toLowerCase();

      String sql = String.format("INSERT INTO %s (%s, path, store_type) VALUES(?, ?, ?) ", TB_TABLES, C_TABLE_ID);

      if (LOG.isDebugEnabled()) {
        LOG.debug(sql);
      }

      pstmt = conn.prepareStatement(sql);
      pstmt.setString(1, tableName);
      pstmt.setString(2, table.getPath());
      pstmt.setString(3, table.getMeta().getStoreType().name());
      pstmt.executeUpdate();
      pstmt.close();

      String tidSql = String.format("SELECT TID from %s WHERE %s = ?", TB_TABLES, C_TABLE_ID);
      pstmt = conn.prepareStatement(tidSql);
      pstmt.setString(1, tableName);
      res = pstmt.executeQuery();

      if (!res.next()) {
        throw new CatalogException("ERROR: there is no tid matched to " + table.getId());
      }

      int tid = res.getInt("TID");
      res.close();
      pstmt.close();

      String colSql = String.format("INSERT INTO %s (TID, %s, column_id, column_name, data_type, type_length)"
          + " VALUES(?, ?, ?, ?, ?, ?) ", TB_COLUMNS, C_TABLE_ID);

      if (LOG.isDebugEnabled()) {
        LOG.debug(sql);
      }

      pstmt = conn.prepareStatement(colSql);
      for(int i = 0; i < table.getSchema().getFieldsCount(); i++) {
        ColumnProto col = table.getSchema().getFields(i);
        pstmt.setInt(1, tid);
        pstmt.setString(2, tableName);
        pstmt.setInt(3, i);
        pstmt.setString(4, CatalogUtil.extractSimpleName(col.getName()));
        pstmt.setString(5, col.getDataType().getType().name());
        pstmt.setInt(6, (col.getDataType().hasLength() ? col.getDataType().getLength() : 0));
        pstmt.addBatch();
        pstmt.clearParameters();
      }
      pstmt.executeBatch();
      pstmt.close();

      if(table.getMeta().hasParams()) {
        String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);

        if (LOG.isDebugEnabled()) {
          LOG.debug(optSql);
        }

        pstmt = conn.prepareStatement(optSql);
        for (CatalogProtos.KeyValueProto entry : table.getMeta().getParams().getKeyvalList()) {
          pstmt.setString(1, tableName);
          pstmt.setString(2, entry.getKey());
          pstmt.setString(3, entry.getValue());
          pstmt.addBatch();
          pstmt.clearParameters();
        }
        pstmt.executeBatch();
        pstmt.close();
      }

      if (table.hasStats()) {

        String statSql =
            String.format("INSERT INTO %s (%s, num_rows, num_bytes) VALUES(?, ?, ?)", TB_STATISTICS, C_TABLE_ID);

        if (LOG.isDebugEnabled()) {
          LOG.debug(statSql);
        }

        pstmt = conn.prepareStatement(statSql);
        pstmt.setString(1, tableName);
        pstmt.setLong(2, table.getStats().getNumRows());
        pstmt.setLong(3, table.getStats().getNumBytes());
        pstmt.executeUpdate();
        pstmt.close();
      }

      if(table.hasPartition()) {
        String partSql =
            String.format("INSERT INTO %s (%s, partition_type, expression, expression_schema) VALUES(?, ?, ?, ?)",
                TB_PARTITION_METHODS, C_TABLE_ID);

        if (LOG.isDebugEnabled()) {
          LOG.debug(partSql);
        }

        pstmt = conn.prepareStatement(partSql);
        pstmt.setString(1, tableName);
        pstmt.setString(2, table.getPartition().getPartitionType().name());
        pstmt.setString(3, table.getPartition().getExpression());
        pstmt.setBytes(4, table.getPartition().getExpressionSchema().toByteArray());
        pstmt.executeUpdate();
      }

      // If there is no error, commit the changes.
      conn.commit();
    } catch (SQLException se) {
      try {
        // If there is any error, rollback the changes.
        conn.rollback();
      } catch (SQLException se2) {
      }
      throw new CatalogException(se);
    } finally {
      CatalogUtil.closeQuietly(conn, pstmt, res);
    }
  }