public void execute()

in baremaps-core/src/main/java/org/apache/baremaps/tasks/ImportShapefile.java [75:174]


  public void execute(WorkflowContext context) throws Exception {
    // Validate required parameters
    if (file == null) {
      throw new WorkflowException("Shapefile path cannot be null");
    }
    if (fileSrid == null) {
      throw new WorkflowException("Source SRID cannot be null");
    }
    if (database == null) {
      throw new WorkflowException("Database connection cannot be null");
    }
    if (databaseSrid == null) {
      throw new WorkflowException("Target SRID cannot be null");
    }

    var path = file.toAbsolutePath();
    logger.info("Importing shapefile from: {}", path);

    var dataSource = context.getDataSource(database);
    // Sanitize table name to prevent SQL injection
    var tableName = sanitizeTableName(
        file.getFileName().toString().replaceFirst("[.][^.]+$", "").toLowerCase());
    logger.info("Creating table: {}", tableName);

    // Set ThreadLocal DataSource for PostgresDdlExecutor to use
    PostgresDdlExecutor.setThreadLocalDataSource(dataSource);

    try {
      // Setup Calcite connection properties
      Properties info = new Properties();
      info.setProperty("lex", "MYSQL");
      info.setProperty("caseSensitive", "false");
      info.setProperty("unquotedCasing", "TO_LOWER");
      info.setProperty("quotedCasing", "TO_LOWER");
      info.setProperty("parserFactory", PostgresDdlExecutor.class.getName() + "#PARSER_FACTORY");

      // Create a ShapefileTable instance
      ShapefileTable shapefileTable = new ShapefileTable(path.toFile());

      // Create a temporary table name for the shapefile data
      String shapefileTableName = "shapefile_data_" + System.currentTimeMillis();

      try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info)) {
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // Register the shapefile table in the Calcite schema
        rootSchema.add(shapefileTableName, shapefileTable);

        // Create a table in PostgreSQL by selecting from the shapefile table
        String createTableSql = "CREATE TABLE " + tableName + " AS " +
            "SELECT * FROM " + shapefileTableName;

        logger.info("Executing SQL: {}", createTableSql);

        // Execute the DDL statement to create the table
        try (Statement statement = connection.createStatement()) {
          statement.execute(createTableSql);
        }

        // Set SRID on geometry column if specified
        try (Connection pgConnection = dataSource.getConnection();
            Statement stmt = pgConnection.createStatement()) {
          stmt.execute(String.format(
              "SELECT UpdateGeometrySRID('%s', 'geometry', %d)",
              tableName, databaseSrid));
        }

        // Verify that the table was created in PostgreSQL
        try (Connection pgConnection = dataSource.getConnection();
            Statement statement = pgConnection.createStatement();
            ResultSet resultSet = statement.executeQuery(
                "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '" +
                    tableName + "')")) {
          if (!resultSet.next() || !resultSet.getBoolean(1)) {
            throw new WorkflowException("Failed to create table: " + tableName);
          }
        }

        // Verify that the table has data
        try (Connection pgConnection = dataSource.getConnection();
            Statement statement = pgConnection.createStatement();
            ResultSet resultSet = statement.executeQuery(
                "SELECT COUNT(*) FROM " + tableName)) {
          if (resultSet.next()) {
            int count = resultSet.getInt(1);
            logger.info("Imported {} rows to table: {}", count, tableName);
            if (count == 0) {
              logger.warn("No rows were imported from shapefile to table: {}", tableName);
            }
          }
        }
      }
    } finally {
      // Clean up thread local storage
      PostgresDdlExecutor.clearThreadLocalDataSource();
    }

    logger.info("Successfully imported shapefile to table: {}", tableName);
  }