static void check_toast_tuple()

in contrib/amcheck/verify_heapam.c [152:542]


static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx,
							  ToastedAttribute *ta, int32 *expected_chunk_seq,
							  uint32 extsize);

static bool check_tuple_attribute(HeapCheckContext *ctx);
static void check_toasted_attribute(HeapCheckContext *ctx,
									ToastedAttribute *ta);

static bool check_tuple_header(HeapCheckContext *ctx);
static bool check_tuple_visibility(HeapCheckContext *ctx);

static void report_corruption(HeapCheckContext *ctx, char *msg);
static void report_toast_corruption(HeapCheckContext *ctx,
									ToastedAttribute *ta, char *msg);
static TupleDesc verify_heapam_tupdesc(void);
static FullTransactionId FullTransactionIdFromXidAndCtx(TransactionId xid,
														const HeapCheckContext *ctx);
static void update_cached_xid_range(HeapCheckContext *ctx);
static void update_cached_mxid_range(HeapCheckContext *ctx);
static XidBoundsViolation check_mxid_in_range(MultiXactId mxid,
											  HeapCheckContext *ctx);
static XidBoundsViolation check_mxid_valid_in_rel(MultiXactId mxid,
												  HeapCheckContext *ctx);
static XidBoundsViolation get_xid_status(TransactionId xid,
										 HeapCheckContext *ctx,
										 XidCommitStatus *status);

/*
 * Scan and report corruption in heap pages, optionally reconciling toasted
 * attributes with entries in the associated toast table.  Intended to be
 * called from SQL with the following parameters:
 *
 *   relation:
 *     The Oid of the heap relation to be checked.
 *
 *   on_error_stop:
 *     Whether to stop at the end of the first page for which errors are
 *     detected.  Note that multiple rows may be returned.
 *
 *   check_toast:
 *     Whether to check each toasted attribute against the toast table to
 *     verify that it can be found there.
 *
 *   skip:
 *     What kinds of pages in the heap relation should be skipped.  Valid
 *     options are "all-visible", "all-frozen", and "none".
 *
 * Returns to the SQL caller a set of tuples, each containing the location
 * and a description of a corruption found in the heap.
 *
 * This code goes to some trouble to avoid crashing the server even if the
 * table pages are badly corrupted, but it's probably not perfect. If
 * check_toast is true, we'll use regular index lookups to try to fetch TOAST
 * tuples, which can certainly cause crashes if the right kind of corruption
 * exists in the toast table or index. No matter what parameters you pass,
 * we can't protect against crashes that might occur trying to look up the
 * commit status of transaction IDs (though we avoid trying to do such lookups
 * for transaction IDs that can't legally appear in the table).
 */
Datum
verify_heapam(PG_FUNCTION_ARGS)
{
	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
	MemoryContext old_context;
	bool		random_access;
	HeapCheckContext ctx;
	Buffer		vmbuffer = InvalidBuffer;
	Oid			relid;
	bool		on_error_stop;
	bool		check_toast;
	SkipPages	skip_option = SKIP_PAGES_NONE;
	BlockNumber first_block;
	BlockNumber last_block;
	BlockNumber nblocks;
	const char *skip;

	/* Check to see if caller supports us returning a tuplestore */
	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("set-valued function called in context that cannot accept a set")));
	if (!(rsinfo->allowedModes & SFRM_Materialize))
		ereport(ERROR,
				(errcode(ERRCODE_SYNTAX_ERROR),
				 errmsg("materialize mode required, but it is not allowed in this context")));

	/* Check supplied arguments */
	if (PG_ARGISNULL(0))
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("relation cannot be null")));
	relid = PG_GETARG_OID(0);

	if (PG_ARGISNULL(1))
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("on_error_stop cannot be null")));
	on_error_stop = PG_GETARG_BOOL(1);

	if (PG_ARGISNULL(2))
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("check_toast cannot be null")));
	check_toast = PG_GETARG_BOOL(2);

	if (PG_ARGISNULL(3))
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("skip cannot be null")));
	skip = text_to_cstring(PG_GETARG_TEXT_PP(3));
	if (pg_strcasecmp(skip, "all-visible") == 0)
		skip_option = SKIP_PAGES_ALL_VISIBLE;
	else if (pg_strcasecmp(skip, "all-frozen") == 0)
		skip_option = SKIP_PAGES_ALL_FROZEN;
	else if (pg_strcasecmp(skip, "none") == 0)
		skip_option = SKIP_PAGES_NONE;
	else
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("invalid skip option"),
				 errhint("Valid skip options are \"all-visible\", \"all-frozen\", and \"none\".")));

	memset(&ctx, 0, sizeof(HeapCheckContext));
	ctx.cached_xid = InvalidTransactionId;
	ctx.toasted_attributes = NIL;

	/*
	 * Any xmin newer than the xmin of our snapshot can't become all-visible
	 * while we're running.
	 */
	ctx.safe_xmin = GetTransactionSnapshot()->xmin;

	/*
	 * If we report corruption when not examining some individual attribute,
	 * we need attnum to be reported as NULL.  Set that up before any
	 * corruption reporting might happen.
	 */
	ctx.attnum = -1;

	/* The tupdesc and tuplestore must be created in ecxt_per_query_memory */
	old_context = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
	random_access = (rsinfo->allowedModes & SFRM_Materialize_Random) != 0;
	ctx.tupdesc = verify_heapam_tupdesc();
	ctx.tupstore = tuplestore_begin_heap(random_access, false, work_mem);
	rsinfo->returnMode = SFRM_Materialize;
	rsinfo->setResult = ctx.tupstore;
	rsinfo->setDesc = ctx.tupdesc;
	MemoryContextSwitchTo(old_context);

	/* Open relation, check relkind and access method */
	ctx.rel = relation_open(relid, AccessShareLock);
	sanity_check_relation(ctx.rel);

	/*
	 * Early exit for unlogged relations during recovery.  These will have no
	 * relation fork, so there won't be anything to check.  We behave as if
	 * the relation is empty.
	 */
	if (ctx.rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED &&
		RecoveryInProgress())
	{
		ereport(DEBUG1,
				(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
				 errmsg("cannot verify unlogged relation \"%s\" during recovery, skipping",
						RelationGetRelationName(ctx.rel))));
		relation_close(ctx.rel, AccessShareLock);
		PG_RETURN_NULL();
	}

	/* Early exit if the relation is empty */
	nblocks = RelationGetNumberOfBlocks(ctx.rel);
	if (!nblocks)
	{
		relation_close(ctx.rel, AccessShareLock);
		PG_RETURN_NULL();
	}

	ctx.bstrategy = GetAccessStrategy(BAS_BULKREAD);
	ctx.buffer = InvalidBuffer;
	ctx.page = NULL;

	/* Validate block numbers, or handle nulls. */
	if (PG_ARGISNULL(4))
		first_block = 0;
	else
	{
		int64		fb = PG_GETARG_INT64(4);

		if (fb < 0 || fb >= nblocks)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("starting block number must be between 0 and %u",
							nblocks - 1)));
		first_block = (BlockNumber) fb;
	}
	if (PG_ARGISNULL(5))
		last_block = nblocks - 1;
	else
	{
		int64		lb = PG_GETARG_INT64(5);

		if (lb < 0 || lb >= nblocks)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("ending block number must be between 0 and %u",
							nblocks - 1)));
		last_block = (BlockNumber) lb;
	}

	/* Optionally open the toast relation, if any. */
	if (ctx.rel->rd_rel->reltoastrelid && check_toast)
	{
		int			offset;

		/* Main relation has associated toast relation */
		ctx.toast_rel = table_open(ctx.rel->rd_rel->reltoastrelid,
								   AccessShareLock);
		offset = toast_open_indexes(ctx.toast_rel,
									AccessShareLock,
									&(ctx.toast_indexes),
									&(ctx.num_toast_indexes));
		ctx.valid_toast_index = ctx.toast_indexes[offset];
	}
	else
	{
		/*
		 * Main relation has no associated toast relation, or we're
		 * intentionally skipping it.
		 */
		ctx.toast_rel = NULL;
		ctx.toast_indexes = NULL;
		ctx.num_toast_indexes = 0;
	}

	update_cached_xid_range(&ctx);
	update_cached_mxid_range(&ctx);
	ctx.relfrozenxid = ctx.rel->rd_rel->relfrozenxid;
	ctx.relfrozenfxid = FullTransactionIdFromXidAndCtx(ctx.relfrozenxid, &ctx);
	ctx.relminmxid = ctx.rel->rd_rel->relminmxid;

	if (TransactionIdIsNormal(ctx.relfrozenxid))
		ctx.oldest_xid = ctx.relfrozenxid;

	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
	{
		OffsetNumber maxoff;

		CHECK_FOR_INTERRUPTS();

		/* Optionally skip over all-frozen or all-visible blocks */
		if (skip_option != SKIP_PAGES_NONE)
		{
			int32		mapbits;

			mapbits = (int32) visibilitymap_get_status(ctx.rel, ctx.blkno,
													   &vmbuffer);
			if (skip_option == SKIP_PAGES_ALL_FROZEN)
			{
				if ((mapbits & VISIBILITYMAP_ALL_FROZEN) != 0)
					continue;
			}

			if (skip_option == SKIP_PAGES_ALL_VISIBLE)
			{
				if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0)
					continue;
			}
		}

		/* Read and lock the next page. */
		ctx.buffer = ReadBufferExtended(ctx.rel, MAIN_FORKNUM, ctx.blkno,
										RBM_NORMAL, ctx.bstrategy);
		LockBuffer(ctx.buffer, BUFFER_LOCK_SHARE);
		ctx.page = BufferGetPage(ctx.buffer);

		/* Perform tuple checks */
		maxoff = PageGetMaxOffsetNumber(ctx.page);
		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
			 ctx.offnum = OffsetNumberNext(ctx.offnum))
		{
			ctx.itemid = PageGetItemId(ctx.page, ctx.offnum);

			/* Skip over unused/dead line pointers */
			if (!ItemIdIsUsed(ctx.itemid) || ItemIdIsDead(ctx.itemid))
				continue;

			/*
			 * If this line pointer has been redirected, check that it
			 * redirects to a valid offset within the line pointer array
			 */
			if (ItemIdIsRedirected(ctx.itemid))
			{
				OffsetNumber rdoffnum = ItemIdGetRedirect(ctx.itemid);
				ItemId		rditem;

				if (rdoffnum < FirstOffsetNumber)
				{
					report_corruption(&ctx,
									  psprintf("line pointer redirection to item at offset %u precedes minimum offset %u",
											   (unsigned) rdoffnum,
											   (unsigned) FirstOffsetNumber));
					continue;
				}
				if (rdoffnum > maxoff)
				{
					report_corruption(&ctx,
									  psprintf("line pointer redirection to item at offset %u exceeds maximum offset %u",
											   (unsigned) rdoffnum,
											   (unsigned) maxoff));
					continue;
				}
				rditem = PageGetItemId(ctx.page, rdoffnum);
				if (!ItemIdIsUsed(rditem))
					report_corruption(&ctx,
									  psprintf("line pointer redirection to unused item at offset %u",
											   (unsigned) rdoffnum));
				continue;
			}

			/* Sanity-check the line pointer's offset and length values */
			ctx.lp_len = ItemIdGetLength(ctx.itemid);
			ctx.lp_off = ItemIdGetOffset(ctx.itemid);

			if (ctx.lp_off != MAXALIGN(ctx.lp_off))
			{
				report_corruption(&ctx,
								  psprintf("line pointer to page offset %u is not maximally aligned",
										   ctx.lp_off));
				continue;
			}
			if (ctx.lp_len < MAXALIGN(SizeofHeapTupleHeader))
			{
				report_corruption(&ctx,
								  psprintf("line pointer length %u is less than the minimum tuple header size %u",
										   ctx.lp_len,
										   (unsigned) MAXALIGN(SizeofHeapTupleHeader)));
				continue;
			}
			if (ctx.lp_off + ctx.lp_len > BLCKSZ)
			{
				report_corruption(&ctx,
								  psprintf("line pointer to page offset %u with length %u ends beyond maximum page offset %u",
										   ctx.lp_off,
										   ctx.lp_len,
										   (unsigned) BLCKSZ));
				continue;
			}

			/* It should be safe to examine the tuple's header, at least */
			ctx.tuphdr = (HeapTupleHeader) PageGetItem(ctx.page, ctx.itemid);
			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);

			/* Ok, ready to check this next tuple */
			check_tuple(&ctx);
		}

		/* clean up */
		UnlockReleaseBuffer(ctx.buffer);

		/*
		 * Check any toast pointers from the page whose lock we just released
		 */
		if (ctx.toasted_attributes != NIL)
		{
			ListCell   *cell;

			foreach(cell, ctx.toasted_attributes)
				check_toasted_attribute(&ctx, lfirst(cell));
			list_free_deep(ctx.toasted_attributes);
			ctx.toasted_attributes = NIL;
		}

		if (on_error_stop && ctx.is_corrupt)
			break;
	}

	if (vmbuffer != InvalidBuffer)
		ReleaseBuffer(vmbuffer);

	/* Close the associated toast table and indexes, if any. */
	if (ctx.toast_indexes)
		toast_close_indexes(ctx.toast_indexes, ctx.num_toast_indexes,
							AccessShareLock);
	if (ctx.toast_rel)
		table_close(ctx.toast_rel, AccessShareLock);

	/* Close the main relation */
	relation_close(ctx.rel, AccessShareLock);

	PG_RETURN_NULL();
}