in baremaps-core/src/main/java/org/apache/baremaps/tasks/ImportGeoPackage.java [78:200]
public void execute(WorkflowContext context) throws Exception {
// Validate required parameters
if (file == null) {
throw new WorkflowException("GeoPackage file path cannot be null");
}
if (fileSrid == null) {
throw new WorkflowException("Source SRID cannot be null");
}
if (database == null) {
throw new WorkflowException("Database connection cannot be null");
}
if (databaseSrid == null) {
throw new WorkflowException("Target SRID cannot be null");
}
var path = file.toAbsolutePath();
logger.info("Importing GeoPackage from: {}", path);
var dataSource = context.getDataSource(database);
// Set ThreadLocal DataSource for PostgresDdlExecutor to use
PostgresDdlExecutor.setThreadLocalDataSource(dataSource);
try {
// Setup Calcite connection properties
Properties info = new Properties();
info.setProperty("lex", "MYSQL");
info.setProperty("caseSensitive", "false");
info.setProperty("unquotedCasing", "TO_LOWER");
info.setProperty("quotedCasing", "TO_LOWER");
info.setProperty("parserFactory", PostgresDdlExecutor.class.getName() + "#PARSER_FACTORY");
// Create a GeoPackageSchema instance
GeoPackageSchema geoPackageSchema = new GeoPackageSchema(path.toFile());
// Create a temporary schema name for the GeoPackage data
String schemaName = "geopackage_schema_" + System.currentTimeMillis();
try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info)) {
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// Register the GeoPackage schema in the Calcite schema
rootSchema.add(schemaName, geoPackageSchema);
// Debug logging to check schema registration
Schema registeredSchema = rootSchema.getSubSchema(schemaName);
logger.info("Registered schema class: {}",
registeredSchema != null ? registeredSchema.getClass().getName() : "null");
logger.info("Is GeoPackageSchema: {}", registeredSchema instanceof GeoPackageSchema);
// Get the list of tables in the GeoPackage
List<String> tables = new ArrayList<>();
// Get the tables directly from the GeoPackage file
GeoPackage geoPackage = GeoPackageManager.open(file.toFile());
tables.addAll(geoPackage.getFeatureTables());
geoPackage.close();
if (tables.isEmpty()) {
logger.warn("No tables found in GeoPackage: {}", path);
return;
}
// Import each table
for (String tableName : tables) {
// Sanitize table name to prevent SQL injection
String sanitizedTableName = sanitizeTableName(tableName);
logger.info("Importing table: {} to: {}", tableName, sanitizedTableName);
// Create a table in PostgreSQL by selecting from the GeoPackage table
String createTableSql = "CREATE TABLE " + sanitizedTableName + " AS " +
"SELECT * FROM " + schemaName + "." + tableName;
logger.info("Executing SQL: {}", createTableSql);
// Execute the DDL statement to create the table
try (Statement statement = connection.createStatement()) {
statement.execute(createTableSql);
}
// Set SRID on geometry column if specified
try (Connection pgConnection = dataSource.getConnection();
Statement stmt = pgConnection.createStatement()) {
stmt.execute(String.format(
"SELECT UpdateGeometrySRID('%s', 'geom', %d)",
sanitizedTableName, databaseSrid));
}
// Verify that the table was created in PostgreSQL
try (Connection pgConnection = dataSource.getConnection();
Statement statement = pgConnection.createStatement();
ResultSet resultSet = statement.executeQuery(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '" +
sanitizedTableName + "')")) {
if (!resultSet.next() || !resultSet.getBoolean(1)) {
throw new WorkflowException("Failed to create table: " + sanitizedTableName);
}
}
// Verify that the table has data
try (Connection pgConnection = dataSource.getConnection();
Statement statement = pgConnection.createStatement();
ResultSet resultSet = statement.executeQuery(
"SELECT COUNT(*) FROM " + sanitizedTableName)) {
if (resultSet.next()) {
int count = resultSet.getInt(1);
logger.info("Imported {} rows to table: {}", count, sanitizedTableName);
if (count == 0) {
logger.warn("No rows were imported from GeoPackage to table: {}",
sanitizedTableName);
}
}
}
}
}
} finally {
// Clean up thread local storage
PostgresDdlExecutor.clearThreadLocalDataSource();
}
logger.info("Successfully imported GeoPackage to database");
}