public void run()

in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java [297:622]


        public void run() {
            // FIXME: we should replay the data table Put, as doing a partial index build would only add
            // the new rows and not delete the previous index value. Also, we should restrict the scan
            // to only data within this region (as otherwise *every* region will be running this code
            // separately, all updating the same data.
            RegionScanner scanner = null;
            PhoenixConnection conn = null;
            try {
                Scan scan = new Scan();
                SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
                    CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L));
                filter.setFilterIfMissing(true);
                scan.setFilter(filter);
                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.TABLE_NAME_BYTES);
                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);

                Map<PTable, List<Pair<PTable,Long>>> dataTableToIndexesMap = null;
                boolean hasMore = false;
                List<Cell> results = new ArrayList<Cell>();
                scanner = this.env.getRegion().getScanner(scan);

                do {
                    results.clear();
                    hasMore = scanner.next(results);
                    if (results.isEmpty()) {
                        LOGGER.debug("Found no indexes with non zero INDEX_DISABLE_TIMESTAMP");
                        break;
                    }

                    Result r = Result.create(results);
                    byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
                    Cell indexStateCell = r.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES);

                    if (disabledTimeStamp == null || disabledTimeStamp.length == 0) {
                        LOGGER.debug("Null or empty INDEX_DISABLE_TIMESTAMP");
                        continue;
                    }

                    byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
                    if ((dataTable == null || dataTable.length == 0) || indexStateCell == null) {
                        // data table name can't be empty
                        LOGGER.debug("Null or data table name or index state");
                        continue;
                    }

                    byte[] indexStateBytes = CellUtil.cloneValue(indexStateCell);
                    byte[][] rowKeyMetaData = new byte[3][];
                    SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData);
                    byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
                    byte[] indexTable = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];

                    // validity check
                    if (indexTable == null || indexTable.length == 0) {
                        LOGGER.debug("We find IndexTable empty during rebuild scan:" + scan
                                + "so, Index rebuild has been skipped for row=" + r);
                        continue;
                    }
                    
                    String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
                    if (onlyTheseTables != null && !onlyTheseTables.contains(dataTableFullName)) {
                        LOGGER.debug("Could not find " + dataTableFullName +
                                " in " + onlyTheseTables);
                        continue;
                    }

                    if (conn == null) {
                        conn = getRebuildIndexConnection(env.getConfiguration());
                        dataTableToIndexesMap = Maps.newHashMap();
                    }
                    PTable dataPTable = conn.getTableNoCache(dataTableFullName);

                    String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
                    PTable indexPTable = conn.getTableNoCache(indexTableFullName);
                    // Sanity check in case index was removed from table
                    if (!dataPTable.getIndexes().contains(indexPTable)) {
                        LOGGER.debug(dataTableFullName + " does not contain " +
                                indexPTable.getName().getString());
                        continue;
                    }
                    
                    PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]);
                    long pendingDisableCountLastUpdatedTs =
                        IndexUtil.getIndexPendingDisableCountLastUpdatedTimestamp(conn, indexTableFullName);
                    long elapsedSinceDisable =
                        EnvironmentEdgeManager.currentTimeMillis() - pendingDisableCountLastUpdatedTs;

                    // on an index write failure, the server side transitions to PENDING_DISABLE, then the client
                    // retries, and after retries are exhausted, disables the index
                    if (indexState == PIndexState.PENDING_DISABLE) {
                        if (elapsedSinceDisable > pendingDisableThreshold) {
                            // too long in PENDING_DISABLE -
                            // client didn't disable the index because last time when
                            // PENDING_DISABLE_COUNT was updated is greater than pendingDisableThreshold,
                            // so we do it here
                            IndexUtil.updateIndexState(conn, indexTableFullName,
                                PIndexState.DISABLE, pendingDisableCountLastUpdatedTs);
                        }
                        continue;
                    }

                    // Only perform relatively expensive check for all regions online when index
                    // is disabled or pending active since that's the state it's placed into when
                    // an index write fails.
                    if ((indexState.isDisabled() || indexState == PIndexState.PENDING_ACTIVE)
                            && !tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
                        LOGGER.debug("Index rebuild has been skipped because not all regions of" +
                                " index table=" + indexPTable.getName() + " are online.");
                        continue;
                    }

                    if (elapsedSinceDisable > indexDisableTimestampThreshold) {
                        /*
                         * It has been too long since the index has been disabled and any future
                         * attempts to reenable it likely will fail. So we are going to mark the
                         * index as disabled and set the index disable timestamp to 0 so that the
                         * rebuild task won't pick up this index again for rebuild.
                         */
                        try {
                            IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, 0l);
                            LOGGER.error("Unable to rebuild index " + indexTableFullName
                                    + ". Won't attempt again since index disable timestamp is" +
                                    " older than current time by " + indexDisableTimestampThreshold
                                    + " milliseconds. Manual intervention needed to re-build" +
                                    " the index");
                        } catch (Throwable ex) {
                            LOGGER.error(
                                "Unable to mark index " + indexTableFullName + " as disabled.", ex);
                        }
                        continue; // don't attempt another rebuild irrespective of whether
                                  // updateIndexState worked or not
                    }
                    // Allow index to begin incremental maintenance as index is back online and we
                    // cannot transition directly from DISABLED -> ACTIVE
                    if (indexState == PIndexState.DISABLE) {
                        if(IndexUtil.getIndexPendingDisableCount(conn, indexTableFullName) < PENDING_DISABLE_INACTIVE_STATE_COUNT){
                            // to avoid incrementing again
                            IndexUtil.incrementCounterForIndex(conn, indexTableFullName, PENDING_DISABLE_INACTIVE_STATE_COUNT);
                        }
                        IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.INACTIVE, null);
                        continue; // Must wait until clients start to do index maintenance again
                    } else if (indexState == PIndexState.PENDING_ACTIVE) {
                        IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, null);
                        continue; // Must wait until clients start to do index maintenance again
                    } else if (indexState != PIndexState.INACTIVE && indexState != PIndexState.ACTIVE) {
                        LOGGER.warn("Unexpected index state of " + indexTableFullName + "="
                                + indexState + ". Skipping partial rebuild attempt.");
                        continue;
                    }
                    long currentTime = EnvironmentEdgeManager.currentTimeMillis();
                    long forwardOverlapDurationMs = env.getConfiguration().getLong(
                            QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, 
                                    QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME);
                    // Wait until no failures have occurred in at least forwardOverlapDurationMs
                    if (indexStateCell.getTimestamp() + forwardOverlapDurationMs > currentTime) {
                        LOGGER.debug("Still must wait " + (indexStateCell.getTimestamp() +
                                forwardOverlapDurationMs - currentTime) +
                                " before starting rebuild for " + indexTableFullName);
                        continue; // Haven't waited long enough yet
                    }
                    Long upperBoundOfRebuild = indexStateCell.getTimestamp() + forwardOverlapDurationMs;
                    // Pass in upperBoundOfRebuild when setting index state or increasing disable ts
                    // and fail if index timestamp > upperBoundOfRebuild.
                    List<Pair<PTable,Long>> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable);
                    if (indexesToPartiallyRebuild == null) {
                        indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
                        dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild);
                    }
                    LOGGER.debug("We have found " + indexPTable.getIndexState() + " Index:" +
                            indexPTable.getName() + " on data table:" + dataPTable.getName() +
                            " which failed to be updated at "
                            + indexPTable.getIndexDisableTimestamp());
                    indexesToPartiallyRebuild.add(new Pair<PTable,Long>(indexPTable,upperBoundOfRebuild));
                } while (hasMore);

				if (dataTableToIndexesMap != null) {
                    long backwardOverlapDurationMs = env.getConfiguration().getLong(
							QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME_ATTRIB,
							env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 
							        QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME));
					for (Map.Entry<PTable, List<Pair<PTable,Long>>> entry : dataTableToIndexesMap.entrySet()) {
						PTable dataPTable = entry.getKey();
						List<Pair<PTable,Long>> pairs = entry.getValue();
                        List<PTable> indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(pairs.size());
						try (
                        Table metaTable = env.getConnection().getTable(
								SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props))) {
							long earliestDisableTimestamp = Long.MAX_VALUE;
                            long latestUpperBoundTimestamp = Long.MIN_VALUE;
							List<IndexMaintainer> maintainers = Lists
									.newArrayListWithExpectedSize(pairs.size());
							int signOfDisableTimeStamp = 0;
							for (Pair<PTable,Long> pair : pairs) {
					            // We need a way of differentiating the block writes to data table case from
					            // the leave index active case. In either case, we need to know the time stamp
					            // at which writes started failing so we can rebuild from that point. If we
					            // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
					            // then writes to the data table will be blocked (this is client side logic
					            // and we can't change this in a minor release). So we use the sign of the
					            // time stamp to differentiate.
							    PTable index = pair.getFirst();
							    Long upperBoundTimestamp = pair.getSecond();
								long disabledTimeStampVal = index.getIndexDisableTimestamp();
								if (disabledTimeStampVal != 0) {
                                    if (signOfDisableTimeStamp != 0 && signOfDisableTimeStamp != Long.signum(disabledTimeStampVal)) {
                                        LOGGER.warn("Found unexpected mix of signs with " +
                                                "INDEX_DISABLE_TIMESTAMP for " +
                                                dataPTable.getName().getString() + " with " +
                                                indexesToPartiallyRebuild);
                                    }
								    signOfDisableTimeStamp = Long.signum(disabledTimeStampVal);
	                                disabledTimeStampVal = Math.abs(disabledTimeStampVal);
									if (disabledTimeStampVal < earliestDisableTimestamp) {
										earliestDisableTimestamp = disabledTimeStampVal;
									}

									indexesToPartiallyRebuild.add(index);
									maintainers.add(index.getIndexMaintainer(dataPTable, conn));
								}
								if (upperBoundTimestamp > latestUpperBoundTimestamp) {
								    latestUpperBoundTimestamp = upperBoundTimestamp;
								}
							}
							// No indexes are disabled, so skip this table
							if (earliestDisableTimestamp == Long.MAX_VALUE) {
                                LOGGER.debug("No indexes are disabled so continuing");
								continue;
							}
							long scanBeginTime = Math.max(0, earliestDisableTimestamp - backwardOverlapDurationMs);
                            long scanEndTime = Math.min(latestUpperBoundTimestamp,
                                    getTimestampForBatch(scanBeginTime,batchExecutedPerTableMap.get(dataPTable.getName())));
                            LOGGER.info("Starting to build " + dataPTable + " indexes "
                                    + indexesToPartiallyRebuild + " from timestamp=" +
                                    scanBeginTime + " until " + scanEndTime);
							
							TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
							// TODO Need to set high timeout
							PostDDLCompiler compiler = new PostDDLCompiler(conn);
							MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, scanEndTime);
							Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), maintainers);

							dataTableScan.setTimeRange(scanBeginTime, scanEndTime);
							dataTableScan.setCacheBlocks(false);
							dataTableScan.setAttribute(BaseScannerRegionObserverConstants.REBUILD_INDEXES, TRUE_BYTES);

							ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
									ByteUtil.EMPTY_BYTE_ARRAY);
							IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild,
									conn);
							byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
                            // TODO : use array of index names as Scan attribute for only
                            // specific index maintainer lookup at the server side.
                            // ScanUtil.setWALAnnotationAttributes(dataPTable, dataTableScan);
							dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
							ScanUtil.setClientVersion(dataTableScan, MetaDataProtocol.PHOENIX_VERSION);
                            LOGGER.info("Starting to partially build indexes:" + indexesToPartiallyRebuild
                                    + " on data table:" + dataPTable.getName() + " with the earliest disable timestamp:"
                                    + earliestDisableTimestamp + " till "
                                    + (scanEndTime == HConstants.LATEST_TIMESTAMP ? "LATEST_TIMESTAMP" : scanEndTime));
							MutationState mutationState = plan.execute();
							long rowCount = mutationState.getUpdateCount();
							decrementIndexesPendingDisableCount(conn, dataPTable, indexesToPartiallyRebuild);
                            if (scanEndTime == latestUpperBoundTimestamp) {
                                LOGGER.info("Rebuild completed for all inactive/disabled indexes in data table:"
                                        + dataPTable.getName());
                            }
                            LOGGER.info(" no. of datatable rows read in rebuilding process is " + rowCount);
							for (PTable indexPTable : indexesToPartiallyRebuild) {
								String indexTableFullName = SchemaUtil.getTableName(
										indexPTable.getSchemaName().getString(),
										indexPTable.getTableName().getString());
								try {
								    if (scanEndTime == latestUpperBoundTimestamp) {
								        IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, 0L,
								            latestUpperBoundTimestamp);
								        batchExecutedPerTableMap.remove(dataPTable.getName());
                                        LOGGER.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding");
								    } else {
								        // Increment timestamp so that client sees updated disable timestamp
								        IndexUtil.updateIndexState(conn, indexTableFullName, indexPTable.getIndexState(),
								            scanEndTime * signOfDisableTimeStamp, latestUpperBoundTimestamp);
								        Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName());
								        if (noOfBatches == null) {
								            noOfBatches = 0l;
								        }
								        batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches);
                                        LOGGER.info(
								            "During Round-robin build: Successfully updated index disabled timestamp  for "
								                + indexTableFullName + " to " + scanEndTime);
								    }
								} catch (SQLException e) {
                                    LOGGER.error("Unable to rebuild " + dataPTable + " index " + indexTableFullName, e);
								}
							}
						} catch (Exception e) {
                            LOGGER.error("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild, e);
						}
					}
				}
			} catch (Throwable t) {
                LOGGER.warn("ScheduledBuildIndexTask failed!", t);
			} finally {
				if (scanner != null) {
					try {
						scanner.close();
					} catch (IOException ignored) {
                        LOGGER.debug("ScheduledBuildIndexTask can't close scanner.", ignored);
					}
				}
				if (conn != null) {
					try {
						conn.close();
					} catch (SQLException ignored) {
                        LOGGER.debug("ScheduledBuildIndexTask can't close connection", ignored);
					}
				}
			}
        }