public void execute()

in baremaps-core/src/main/java/org/apache/baremaps/tasks/ImportGeoPackage.java [78:200]


  public void execute(WorkflowContext context) throws Exception {
    // Validate required parameters
    if (file == null) {
      throw new WorkflowException("GeoPackage file path cannot be null");
    }
    if (fileSrid == null) {
      throw new WorkflowException("Source SRID cannot be null");
    }
    if (database == null) {
      throw new WorkflowException("Database connection cannot be null");
    }
    if (databaseSrid == null) {
      throw new WorkflowException("Target SRID cannot be null");
    }

    var path = file.toAbsolutePath();
    logger.info("Importing GeoPackage from: {}", path);

    var dataSource = context.getDataSource(database);

    // Set ThreadLocal DataSource for PostgresDdlExecutor to use
    PostgresDdlExecutor.setThreadLocalDataSource(dataSource);

    try {
      // Setup Calcite connection properties
      Properties info = new Properties();
      info.setProperty("lex", "MYSQL");
      info.setProperty("caseSensitive", "false");
      info.setProperty("unquotedCasing", "TO_LOWER");
      info.setProperty("quotedCasing", "TO_LOWER");
      info.setProperty("parserFactory", PostgresDdlExecutor.class.getName() + "#PARSER_FACTORY");

      // Create a GeoPackageSchema instance
      GeoPackageSchema geoPackageSchema = new GeoPackageSchema(path.toFile());

      // Create a temporary schema name for the GeoPackage data
      String schemaName = "geopackage_schema_" + System.currentTimeMillis();

      try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info)) {
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // Register the GeoPackage schema in the Calcite schema
        rootSchema.add(schemaName, geoPackageSchema);

        // Debug logging to check schema registration
        Schema registeredSchema = rootSchema.getSubSchema(schemaName);
        logger.info("Registered schema class: {}",
            registeredSchema != null ? registeredSchema.getClass().getName() : "null");
        logger.info("Is GeoPackageSchema: {}", registeredSchema instanceof GeoPackageSchema);

        // Get the list of tables in the GeoPackage
        List<String> tables = new ArrayList<>();

        // Get the tables directly from the GeoPackage file
        GeoPackage geoPackage = GeoPackageManager.open(file.toFile());
        tables.addAll(geoPackage.getFeatureTables());
        geoPackage.close();

        if (tables.isEmpty()) {
          logger.warn("No tables found in GeoPackage: {}", path);
          return;
        }

        // Import each table
        for (String tableName : tables) {
          // Sanitize table name to prevent SQL injection
          String sanitizedTableName = sanitizeTableName(tableName);
          logger.info("Importing table: {} to: {}", tableName, sanitizedTableName);

          // Create a table in PostgreSQL by selecting from the GeoPackage table
          String createTableSql = "CREATE TABLE " + sanitizedTableName + " AS " +
              "SELECT * FROM " + schemaName + "." + tableName;

          logger.info("Executing SQL: {}", createTableSql);

          // Execute the DDL statement to create the table
          try (Statement statement = connection.createStatement()) {
            statement.execute(createTableSql);
          }

          // Set SRID on geometry column if specified
          try (Connection pgConnection = dataSource.getConnection();
              Statement stmt = pgConnection.createStatement()) {
            stmt.execute(String.format(
                "SELECT UpdateGeometrySRID('%s', 'geom', %d)",
                sanitizedTableName, databaseSrid));
          }

          // Verify that the table was created in PostgreSQL
          try (Connection pgConnection = dataSource.getConnection();
              Statement statement = pgConnection.createStatement();
              ResultSet resultSet = statement.executeQuery(
                  "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '" +
                      sanitizedTableName + "')")) {
            if (!resultSet.next() || !resultSet.getBoolean(1)) {
              throw new WorkflowException("Failed to create table: " + sanitizedTableName);
            }
          }

          // Verify that the table has data
          try (Connection pgConnection = dataSource.getConnection();
              Statement statement = pgConnection.createStatement();
              ResultSet resultSet = statement.executeQuery(
                  "SELECT COUNT(*) FROM " + sanitizedTableName)) {
            if (resultSet.next()) {
              int count = resultSet.getInt(1);
              logger.info("Imported {} rows to table: {}", count, sanitizedTableName);
              if (count == 0) {
                logger.warn("No rows were imported from GeoPackage to table: {}",
                    sanitizedTableName);
              }
            }
          }
        }
      }
    } finally {
      // Clean up thread local storage
      PostgresDdlExecutor.clearThreadLocalDataSource();
    }

    logger.info("Successfully imported GeoPackage to database");
  }