in ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java [860:1157]
private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception {
Long lastReplId;// get list of events matching dbPattern & tblPattern
// go through each event, and dump out each event to a event-level dump dir inside dumproot
String validTxnList = null;
long waitUntilTime = 0;
long bootDumpBeginReplId = -1;
List<String> tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
SnapshotUtils.ReplSnapshotCount snapshotCount = null;
// If we are bootstrapping ACID tables, we need to perform steps similar to a regular
// bootstrap (See bootstrapDump() for more details. Only difference here is instead of
// waiting for the concurrent transactions to finish, we start dumping the incremental events
// and wait only for the remaining time if any.
if (needBootstrapAcidTablesDuringIncrementalDump()) {
work.setBootstrap(true);
bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
assert (bootDumpBeginReplId >= 0);
LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}",
work.dbNameOrPattern);
long timeoutInMs = HiveConf.getTimeVar(conf,
HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
waitUntilTime = System.currentTimeMillis() + timeoutInMs;
}
// TODO : instead of simply restricting by message format, we should eventually
// move to a jdbc-driver-stype registering of message format, and picking message
// factory per event to decode. For now, however, since all messages have the
// same factory, restricting by message format is effectively a guard against
// older leftover data that would cause us problems.
String dbName = work.dbNameOrPattern;
Database db = hiveDb.getDatabase(dbName);
if (!HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY)) {
setReplSourceFor(hiveDb, dbName, db);
}
if (shouldFailover()) {
if (!MetaStoreUtils.isDbBeingPlannedFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.SOURCE)) {
// set repl failover enabled at source
HashMap<String, String> params = new HashMap<>(db.getParameters());
params.put(ReplConst.REPL_FAILOVER_ENDPOINT, MetaStoreUtils.FailoverEndpoint.SOURCE.toString());
params.put(ReplConst.REPL_METRICS_LAST_FAILOVER_TYPE, ReplConst.FailoverType.PLANNED.toString());
LOG.info("Replication Metrics: Setting last failover type for database: {} to: {} ", dbName, ReplConst.FailoverType.PLANNED.toString());
int failoverCount = 1 + NumberUtils.toInt(params.getOrDefault(ReplConst.REPL_METRICS_FAILOVER_COUNT, "0"), 0);
LOG.info("Replication Metrics: Setting replication metrics failover count for target database: {} to: {} ", dbName, failoverCount);
params.put(ReplConst.REPL_METRICS_FAILOVER_COUNT, Integer.toString(failoverCount));
db.setParameters(params);
getHive().alterDatabase(work.dbNameOrPattern, db);
}
fetchFailoverMetadata(hiveDb);
assert work.getFailoverMetadata().isValidMetadata();
work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId, work.getFailoverMetadata().getFailoverEventId());
} else {
work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId, -1);
}
IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
new ReplEventFilter(work.replScope),
new CatalogFilter(MetaStoreUtils.getDefaultCatalog(conf)),
new EventBoundaryFilter(work.eventFrom, work.eventTo));
EventUtils.MSClientNotificationFetcher evFetcher
= new EventUtils.MSClientNotificationFetcher(hiveDb);
int maxEventLimit = getMaxEventAllowed(work.maxEventLimit());
EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
evFetcher, work.eventFrom, maxEventLimit, evFilter);
lastReplId = work.eventTo;
Path ackFile = new Path(dumpRoot, ReplAck.EVENTS_DUMP.toString());
boolean shouldBatch = conf.getBoolVar(HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS);
EventsDumpMetadata eventsDumpMetadata =
Utils.fileExists(ackFile, conf) ? EventsDumpMetadata.deserialize(ackFile, conf)
: new EventsDumpMetadata(work.eventFrom, 0, shouldBatch);
long resumeFrom = eventsDumpMetadata.getLastReplId();
long estimatedNumEvents = evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
maxEventLimit);
try {
IncrementalDumpLogger replLogger =
new IncrementalDumpLogger(dbName, dumpRoot.toString(), estimatedNumEvents, work.eventFrom, work.eventTo,
maxEventLimit);
work.setReplLogger(replLogger);
replLogger.startLog();
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.EVENTS.name(), estimatedNumEvents);
int size = tablesForBootstrap.size();
if (db != null && db.getParameters()!=null &&
Boolean.parseBoolean(db.getParameters().get(REPL_RESUME_STARTED_AFTER_FAILOVER))) {
Collection<String> allTables = Utils.getAllTables(hiveDb, dbName, work.replScope);
allTables.retainAll(tablesForBootstrap);
size = allTables.size();
}
if (size > 0) {
metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) size);
}
if (shouldFailover()) {
Map<String, String> params = db.getParameters();
String dbFailoverEndPoint = "";
if (params != null) {
dbFailoverEndPoint = params.get(ReplConst.REPL_FAILOVER_ENDPOINT);
LOG.debug("Replication Metrics: setting failover endpoint to {} ", dbFailoverEndPoint);
} else {
LOG.warn("Replication Metrics: Cannot obtained failover endpoint info, setting failover endpoint to null ");
}
work.getMetricCollector().reportFailoverStart(getName(), metricMap, work.getFailoverMetadata(), dbFailoverEndPoint, ReplConst.FailoverType.PLANNED.toString());
} else {
work.getMetricCollector().reportStageStart(getName(), metricMap);
}
long dumpedCount = resumeFrom - work.eventFrom;
if (dumpedCount > 0) {
LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount);
}
boolean isStagingDirCheckedForFailedEvents = false;
int batchNo = 0, eventCount = 0;
final int maxEventsPerBatch = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
Path eventRootDir = dumpRoot;
if (shouldBatch && maxEventsPerBatch == 0) {
throw new SemanticException(String.format(
"batch size configured via %s cannot be set to zero since batching is enabled",
HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname));
}
if (eventsDumpMetadata.isEventsBatched() != shouldBatch) {
LOG.error("Failed to resume from previous dump. {} was set to {} in previous dump but currently it's" +
" set to {}. Cannot dump events in {} manner because they were {} batched in " +
"the previous incomplete run",
HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS.varname, eventsDumpMetadata.isEventsBatched(),
shouldBatch, shouldBatch ? "batched" : "sequential", shouldBatch ? "not" : ""
);
throw new HiveException(
String.format("Failed to resume from previous dump. %s must be set to %s, but currently it's set to %s",
HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS,
eventsDumpMetadata.isEventsBatched(), shouldBatch)
);
}
while (evIter.hasNext()) {
NotificationEvent ev = evIter.next();
lastReplId = ev.getEventId();
if (shouldBatch && eventCount++ % maxEventsPerBatch == 0) {
eventRootDir = new Path(dumpRoot, String.format(ReplUtils.INC_EVENTS_BATCH, ++batchNo));
}
if (ev.getEventId() <= resumeFrom) {
continue;
}
// Checking and removing remnant file from staging directory if previous incremental repl dump is failed
if (!isStagingDirCheckedForFailedEvents) {
cleanFailedEventDirIfExists(eventRootDir, ev.getEventId());
isStagingDirCheckedForFailedEvents = true;
}
//disable materialized-view replication if not configured
if (!isMaterializedViewsReplEnabled()) {
String tblName = ev.getTableName();
if (tblName != null) {
try {
Table table = hiveDb.getTable(dbName, tblName);
if (table != null && TableType.MATERIALIZED_VIEW.equals(table.getTableType())) {
LOG.info("Attempt to dump materialized view : " + tblName);
continue;
}
} catch (InvalidTableException te) {
LOG.debug(te.getMessage());
}
}
}
Path eventDir = new Path(eventRootDir, String.valueOf(lastReplId));
dumpEvent(ev, eventDir, dumpRoot, cmRoot, hiveDb, eventsDumpMetadata);
eventsDumpMetadata.setLastReplId(lastReplId);
Utils.writeOutput(eventsDumpMetadata.serialize(), ackFile, conf);
}
replLogger.endLog(lastReplId.toString());
LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
} finally {
//write the dmd always irrespective of success/failure to enable checkpointing in table level replication
long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
if (work.isSecondDumpAfterFailover()){
dmd.setDump(DumpType.OPTIMIZED_BOOTSTRAP, work.eventFrom, lastReplId, cmRoot, executionId,
previousReplScopeModified());
}
else {
dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId,
previousReplScopeModified());
}
// If repl policy is changed (oldReplScope is set), then pass the current replication policy,
// so that REPL LOAD would drop the tables which are not included in current policy.
dmd.setReplScope(work.replScope);
dmd.write(true);
}
// Get snapshot related configurations for external data copy.
boolean isSnapshotEnabled = conf.getBoolVar(REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY);
String snapshotPrefix = dbName.toLowerCase();
ArrayList<String> prevSnaps = new ArrayList<>();
try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, conf);
FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, conf);
FileList snapPathFileList = isSnapshotEnabled ? createTableFileList(
SnapshotUtils.getSnapshotFileListPath(dumpRoot), EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, conf) : null) {
// Examine all the tables if required.
if (shouldExamineTablesToDump() || (tableList != null)) {
// If required wait more for any transactions open at the time of starting the ACID bootstrap.
if (needBootstrapAcidTablesDuringIncrementalDump()) {
assert (waitUntilTime > 0);
validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
}
/* When same dump dir is resumed because of check-pointing, we need to clear the existing metadata.
We need to rewrite the metadata as the write id list will be changed.
We can't reuse the previous write id as it might be invalid due to compaction. */
Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME);
FileSystem fs = FileSystem.get(metadataPath.toUri(), conf);
try {
fs.delete(metadataPath, true);
} catch (FileNotFoundException e) {
// no worries
}
Path dbRootMetadata = new Path(metadataPath, dbName);
Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName);
boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
ReplExternalTables externalTablesWriter = new ReplExternalTables(conf);
boolean isSingleTaskForExternalDb =
conf.getBoolVar(REPL_EXTERNAL_WAREHOUSE_SINGLE_COPY_TASK) && work.replScope.includeAllTables();
HashMap<String, Boolean> singleCopyPaths = getNonTableLevelCopyPaths(db, isSingleTaskForExternalDb);
boolean isExternalTablePresent = false;
if (isSnapshotEnabled) {
snapshotCount = new SnapshotUtils.ReplSnapshotCount();
if (snapPathFileList.hasNext()) {
prevSnaps = getListFromFileList(snapPathFileList);
}
}
ExportService exportService = new ExportService(conf);
for(String matchedDbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
for (String tableName : Utils.matchesTbl(hiveDb, matchedDbName, work.replScope)) {
try {
Table table = hiveDb.getTable(matchedDbName, tableName);
// Dump external table locations if required.
if (TableType.EXTERNAL_TABLE.equals(table.getTableType()) && shouldDumpExternalTableLocation(conf)) {
externalTablesWriter
.dataLocationDump(table, extTableFileList, singleCopyPaths, !isSingleTaskForExternalDb, conf);
isExternalTablePresent = true;
}
// Dump the table to be bootstrapped if required.
if (shouldBootstrapDumpTable(table)) {
HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, matchedDbName).table(table);
dumpTable(exportService, matchedDbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId,
hiveDb, tableTuple, managedTblList, dataCopyAtLoad);
}
if (tableList != null && doesTableSatisfyConfig(table)) {
tableList.add(tableName);
}
} catch (InvalidTableException te) {
// Repl dump shouldn't fail if the table is dropped/renamed while dumping it.
// Just log a debug message and skip it.
LOG.debug(te.getMessage());
}
}
if (exportService != null && exportService.isExportServiceRunning()) {
try {
exportService.waitForTasksToFinishAndShutdown();
} catch (SemanticException e) {
LOG.error("ExportService thread failed to perform table dump operation ", e.getCause());
throw new SemanticException(e.getMessage(), e);
}
try {
exportService.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Error while shutting down ExportService ", e);
}
}
// if it is not a table level replication, add a single task for
// the database default location and the paths configured.
if (isExternalTablePresent && shouldDumpExternalTableLocation(conf) && isSingleTaskForExternalDb) {
externalTablesWriter
.dumpNonTableLevelCopyPaths(singleCopyPaths, extTableFileList, conf, isSnapshotEnabled, snapshotPrefix,
snapshotCount, snapPathFileList, prevSnaps, false);
}
}
dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
}
setDataCopyIterators(extTableFileList, managedTblList);
work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId, snapshotCount, null);
// Clean-up snapshots
if (isSnapshotEnabled) {
cleanupSnapshots(SnapshotUtils.getSnapshotFileListPath(dumpRoot), work.dbNameOrPattern.toLowerCase(), conf,
snapshotCount, false);
}
return lastReplId;
}
}