in odps-sqoop/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java [810:1094]
private void createAnyRequiredOracleObjects(SqoopOptions sqoopOptions,
Connection connection, OraOopConnManager oraOopConnManager,
List<OraOopLogMessage> messagesToDisplayAfterWelcome) throws SQLException {
Configuration conf = sqoopOptions.getConf();
// The SYSDATE on the Oracle database will be used as the partition value
// for this export job...
Object sysDateTime = OraOopOracleQueries.getSysDate(connection);
String sysDateStr =
OraOopOracleQueries.oraDATEToString(sysDateTime, "yyyy-mm-dd hh24:mi:ss");
OraOopUtilities.rememberOracleDateTime(conf,
OraOopConstants.ORAOOP_JOB_SYSDATE, sysDateStr);
checkForOldOraOopTemporaryOracleTables(connection, sysDateTime,
OraOopOracleQueries.getCurrentSchema(connection),
messagesToDisplayAfterWelcome);
// Store the actual partition value, so the N mappers know what value to
// insert...
String partitionValue =
OraOopOracleQueries.oraDATEToString(sysDateTime,
OraOopConstants.ORAOOP_EXPORT_PARTITION_DATE_FORMAT);
conf.set(OraOopConstants.ORAOOP_EXPORT_PARTITION_DATE_VALUE,
partitionValue);
// Generate the (22 character) partition name...
String partitionName =
OraOopUtilities
.createExportTablePartitionNameFromOracleTimestamp(sysDateTime);
int numMappers = sqoopOptions.getNumMappers();
String exportTableTemplate =
conf.get(OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE, "");
String user = sqoopOptions.getUsername();
if (user == null) {
user = OracleManager.getSessionUser(connection);
}
OracleTable templateTableContext =
OraOopUtilities.decodeOracleTableName(user, exportTableTemplate);
boolean noLoggingOnNewTable =
conf.getBoolean(OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_NO_LOGGING,
false);
String updateKeyCol = sqoopOptions.getUpdateKeyCol();
/* =========================== */
/* VALIDATION OF INPUTS */
/* =========================== */
if (updateKeyCol == null || updateKeyCol.isEmpty()) {
// We're performing an "insert" export, not an "update" export.
// Check that the "oraoop.export.merge" property has not been specified,
// as this would be
// an invalid scenario...
if (OraOopUtilities.getExportUpdateMode(conf) == UpdateMode.Merge) {
throw new RuntimeException(String.format(
"\n\nThe option \"%s\" can only be used if \"%s\" is "
+ "also used.\n", OraOopConstants.ORAOOP_EXPORT_MERGE,
"--update-key"));
}
}
if (OraOopUtilities
.userWantsToCreatePartitionedExportTableFromTemplate(conf)
|| OraOopUtilities
.userWantsToCreateNonPartitionedExportTableFromTemplate(conf)) {
// OraOop will create the export table.
if (oraOopConnManager.getOracleTableContext().getName().length()
> OraOopConstants.Oracle.MAX_IDENTIFIER_LENGTH) {
String msg =
String.format(
"The Oracle table name \"%s\" is longer than %d characters.\n"
+ "Oracle will not allow a table with this name to be created.",
oraOopConnManager.getOracleTableContext().getName(),
OraOopConstants.Oracle.MAX_IDENTIFIER_LENGTH);
throw new RuntimeException(msg);
}
if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
// We're performing an "update" export, not an "insert" export.
// Check whether the user is attempting an "update" (i.e. a non-merge).
// If so, they're
// asking to only UPDATE rows in a (about to be created) (empty) table
// that contains no rows.
// This will be a waste of time, as we'd be attempting to perform UPDATE
// operations against a
// table with no rows in it...
UpdateMode updateMode = OraOopUtilities.getExportUpdateMode(conf);
if (updateMode == UpdateMode.Update) {
throw new RuntimeException(String.format(
"\n\nCombining the option \"%s\" with the option \"%s=false\" is "
+ "nonsensical, as this would create an "
+ "empty table and then perform "
+ "a lot of work that results in a table containing no rows.\n",
OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_TEMPLATE,
OraOopConstants.ORAOOP_EXPORT_MERGE));
}
}
// Check that the specified template table actually exists and is a
// table...
String templateTableObjectType =
OraOopOracleQueries.getOracleObjectType(connection,
templateTableContext);
if (templateTableObjectType == null) {
throw new RuntimeException(String.format(
"The specified Oracle template table \"%s\" does not exist.",
templateTableContext.toString()));
}
if (!templateTableObjectType
.equalsIgnoreCase(OraOopConstants.Oracle.OBJECT_TYPE_TABLE)) {
throw new RuntimeException(
String.format(
"The specified Oracle template table \"%s\" is not an "
+ "Oracle table, it's a %s.",
templateTableContext.toString(), templateTableObjectType));
}
if (conf.getBoolean(OraOopConstants.ORAOOP_EXPORT_CREATE_TABLE_DROP,
false)) {
OraOopOracleQueries.dropTable(connection, oraOopConnManager
.getOracleTableContext());
}
// Check that there is no existing database object with the same name of
// the table to be created...
String newTableObjectType =
OraOopOracleQueries.getOracleObjectType(connection, oraOopConnManager
.getOracleTableContext());
if (newTableObjectType != null) {
throw new RuntimeException(
String.format(
"%s cannot create a new Oracle table named %s as a \"%s\" "
+ "with this name already exists.",
OraOopConstants.ORAOOP_PRODUCT_NAME, oraOopConnManager
.getOracleTableContext().toString(), newTableObjectType));
}
} else {
// The export table already exists.
if (updateKeyCol != null && !updateKeyCol.isEmpty()) {
// We're performing an "update" export, not an "insert" export.
// Check that there exists an index on the export table on the
// update-key column(s).
// Without such an index, this export may perform like a real dog...
String[] updateKeyColumns =
OraOopUtilities.getExportUpdateKeyColumnNames(sqoopOptions);
if (!OraOopOracleQueries.doesIndexOnColumnsExist(connection,
oraOopConnManager.getOracleTableContext(), updateKeyColumns)) {
String msg = String.format(
"\n**************************************************************"
+ "***************************************************************"
+ "\n\tThe table %1$s does not have a valid index on "
+ "the column(s) %2$s.\n"
+ "\tAs a consequence, this export may take a long time to "
+ "complete.\n"
+ "\tIf performance is unacceptable, consider reattempting this "
+ "job after creating an index "
+ "on this table via the SQL...\n"
+ "\t\tcreate index <index_name> on %1$s(%2$s);\n"
+ "****************************************************************"
+ "*************************************************************",
oraOopConnManager.getOracleTableContext().toString(),
OraOopUtilities.stringArrayToCSV(updateKeyColumns));
messagesToDisplayAfterWelcome.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.WARN, msg));
}
}
}
/* ================================= */
/* CREATE A PARTITIONED TABLE */
/* ================================= */
if (OraOopUtilities
.userWantsToCreatePartitionedExportTableFromTemplate(conf)) {
// Create a new Oracle table using the specified template...
String[] subPartitionNames =
OraOopUtilities.generateExportTableSubPartitionNames(numMappers,
sysDateTime, conf);
// Create the export table from a template table...
String tableStorageClause =
OraOopUtilities.getExportTableStorageClause(conf);
OraOopOracleQueries.createExportTableFromTemplateWithPartitioning(
connection, oraOopConnManager.getOracleTableContext(),
tableStorageClause, templateTableContext, noLoggingOnNewTable,
partitionName, sysDateTime, sqoopOptions.getNumMappers(),
subPartitionNames);
return;
}
/* ===================================== */
/* CREATE A NON-PARTITIONED TABLE */
/* ===================================== */
if (OraOopUtilities
.userWantsToCreateNonPartitionedExportTableFromTemplate(conf)) {
String tableStorageClause =
OraOopUtilities.getExportTableStorageClause(conf);
OraOopOracleQueries.createExportTableFromTemplate(connection,
oraOopConnManager.getOracleTableContext(), tableStorageClause,
templateTableContext, noLoggingOnNewTable);
return;
}
/* ===================================================== */
/* ADD ADDITIONAL PARTITIONS TO AN EXISTING TABLE */
/* ===================================================== */
// If the export table is partitioned, and the partitions were created by
// OraOop, then we need
// create additional partitions...
OracleTablePartitions tablePartitions =
OraOopOracleQueries.getPartitions(connection, oraOopConnManager
.getOracleTableContext());
// Find any partition name starting with "ORAOOP_"...
OracleTablePartition oraOopPartition =
tablePartitions.findPartitionByRegEx("^"
+ OraOopConstants.EXPORT_TABLE_PARTITION_NAME_PREFIX);
if (tablePartitions.size() > 0 && oraOopPartition == null) {
for (int idx = 0; idx < tablePartitions.size(); idx++) {
messagesToDisplayAfterWelcome.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.INFO, String.format(
"The Oracle table %s has a partition named \"%s\".",
oraOopConnManager.getOracleTableContext().toString(),
tablePartitions.get(idx).getName())));
}
messagesToDisplayAfterWelcome.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.WARN, String.format(
"The Oracle table %s is partitioned.\n"
+ "These partitions were not created by %s.",
oraOopConnManager.getOracleTableContext().toString(),
OraOopConstants.ORAOOP_PRODUCT_NAME)));
}
if (oraOopPartition != null) {
// Indicate in the configuration what's happening...
conf.setBoolean(OraOopConstants.EXPORT_TABLE_HAS_ORAOOP_PARTITIONS, true);
messagesToDisplayAfterWelcome
.add(new OraOopLogMessage(
OraOopConstants.Logging.Level.INFO,
String
.format(
"The Oracle table %s is partitioned.\n"
+ "These partitions were created by %s, so "
+ "additional partitions will now be created.\n"
+ "The name of the new partition will be \"%s\".",
oraOopConnManager.getOracleTableContext().toString(),
OraOopConstants.ORAOOP_PRODUCT_NAME, partitionName)));
String[] subPartitionNames =
OraOopUtilities.generateExportTableSubPartitionNames(numMappers,
sysDateTime, conf);
// Add another partition (and N subpartitions) to this existing,
// partitioned export table...
OraOopOracleQueries.createMoreExportTablePartitions(connection,
oraOopConnManager.getOracleTableContext(), partitionName,
sysDateTime, subPartitionNames);
return;
}
}