static MY_ATTRIBUTE()

in storage/innobase/row/row0merge.cc [1676:2688]


static MY_ATTRIBUTE((warn_unused_result))
dberr_t
row_merge_read_clustered_index(
	trx_t*			trx,
	struct TABLE*		table,
	const dict_table_t*	old_table,
	dict_table_t*		new_table,
	bool			online,
	dict_index_t**		index,
	dict_index_t*		fts_sort_idx,
	fts_psort_t*		psort_info,
	merge_file_t*		files,
	const ulint*		key_numbers,
	ulint			n_index,
	const dtuple_t*		add_cols,
	const dict_add_v_col_t*	add_v,
	const ulint*		col_map,
	ulint			add_autoinc,
	ib_sequence_t&		sequence,
	row_merge_block_t*	block,
	bool			skip_pk_sort,
	int*			tmpfd,
	ut_stage_alter_t*	stage,
	struct TABLE*		eval_table)
{
	dict_index_t*		clust_index;	/* Clustered index */
	mem_heap_t*		row_heap;	/* Heap memory to create
						clustered index tuples */
	row_merge_buf_t**	merge_buf;	/* Temporary list for records*/
	mem_heap_t*		v_heap = NULL;	/* Heap memory to process large
						data for virtual column */
	btr_pcur_t		pcur;		/* Cursor on the clustered
						index */
	mtr_t			mtr;		/* Mini transaction */
	dberr_t			err = DB_SUCCESS;/* Return code */
	ulint			n_nonnull = 0;	/* number of columns
						changed to NOT NULL */
	ulint*			nonnull = NULL;	/* NOT NULL columns */
	dict_index_t*		fts_index = NULL;/* FTS index */
	doc_id_t		doc_id = 0;
	doc_id_t		max_doc_id = 0;
	ibool			add_doc_id = FALSE;
	os_event_t		fts_parallel_sort_event = NULL;
	ibool			fts_pll_sort = FALSE;
	int64_t			sig_count = 0;
	index_tuple_info_t**	sp_tuples = NULL;
	mem_heap_t*		sp_heap = NULL;
	ulint			num_spatial = 0;
	BtrBulk*		clust_btr_bulk = NULL;
	bool			clust_temp_file = false;
	mem_heap_t*		mtuple_heap = NULL;
	mtuple_t		prev_mtuple;
	mem_heap_t*		conv_heap = NULL;
	FlushObserver*		observer = trx->flush_observer;
	DBUG_ENTER("row_merge_read_clustered_index");

	ut_ad((old_table == new_table) == !col_map);
	ut_ad(!add_cols || col_map);

	trx->op_info = "reading clustered index";

#ifdef FTS_INTERNAL_DIAG_PRINT
	DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
#endif

	/* Create and initialize memory for record buffers */

	merge_buf = static_cast<row_merge_buf_t**>(
		ut_malloc_nokey(n_index * sizeof *merge_buf));

	row_merge_dup_t	clust_dup = {index[0], table, col_map, 0};
	dfield_t*	prev_fields;
	const ulint	n_uniq = dict_index_get_n_unique(index[0]);

	ut_ad(trx->mysql_thd != NULL);

	const char*	path = thd_innodb_tmpdir(trx->mysql_thd);

	ut_ad(!skip_pk_sort || index[0]->is_clustered());
	/* There is no previous tuple yet. */
	prev_mtuple.fields = NULL;

	for (ulint i = 0; i < n_index; i++) {
		if (index[i]->type & DICT_FTS) {

			/* We are building a FT index, make sure
			we have the temporary 'fts_sort_idx' */
			ut_a(fts_sort_idx);

			fts_index = index[i];

			merge_buf[i] = row_merge_buf_create(fts_sort_idx);

			add_doc_id = DICT_TF2_FLAG_IS_SET(
				new_table, DICT_TF2_FTS_ADD_DOC_ID);

			/* If Doc ID does not exist in the table itself,
			fetch the first FTS Doc ID */
			if (add_doc_id) {
				fts_get_next_doc_id(
					(dict_table_t*) new_table,
					&doc_id);
				ut_ad(doc_id > 0);
			}

			fts_pll_sort = TRUE;
			row_fts_start_psort(psort_info);
			fts_parallel_sort_event =
				 psort_info[0].psort_common->sort_event;
		} else {
			if (dict_index_is_spatial(index[i])) {
				num_spatial++;
			}

			merge_buf[i] = row_merge_buf_create(index[i]);
		}
	}

	if (num_spatial > 0) {
		ulint	count = 0;

		sp_heap = mem_heap_create(512);

		sp_tuples = static_cast<index_tuple_info_t**>(
			ut_malloc_nokey(num_spatial
					* sizeof(*sp_tuples)));

		for (ulint i = 0; i < n_index; i++) {
			if (dict_index_is_spatial(index[i])) {
				sp_tuples[count]
					= UT_NEW_NOKEY(
						index_tuple_info_t(
							sp_heap,
							index[i]));
				count++;
			}
		}

		ut_ad(count == num_spatial);
	}

	mtr_start(&mtr);

	/* Find the clustered index and create a persistent cursor
	based on that. */

	clust_index = const_cast<dict_table_t*>(old_table)->first_index();

	btr_pcur_open_at_index_side(
		true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);

	if (old_table != new_table) {
		/* The table is being rebuilt.  Identify the columns
		that were flagged NOT NULL in the new table, so that
		we can quickly check that the records in the old table
		do not violate the added NOT NULL constraints. */

		nonnull = static_cast<ulint*>(
			ut_malloc_nokey(new_table->get_n_cols()
				  * sizeof *nonnull));

		for (ulint i = 0; i < old_table->get_n_cols(); i++) {
			if (old_table->get_col(i)->prtype
			    & DATA_NOT_NULL) {
				continue;
			}

			const ulint j = col_map[i];

			if (j == ULINT_UNDEFINED) {
				/* The column was dropped. */
				continue;
			}

			if (new_table->get_col(j)->prtype
			    & DATA_NOT_NULL) {
				nonnull[n_nonnull++] = j;
			}
		}

		if (!n_nonnull) {
			ut_free(nonnull);
			nonnull = NULL;
		}
	}

	row_heap = mem_heap_create(sizeof(mrec_buf_t));

	if (dict_table_is_comp(old_table)
	    && !dict_table_is_comp(new_table)) {
		conv_heap = mem_heap_create(sizeof(mrec_buf_t));
	}

	if (skip_pk_sort) {
		prev_fields = static_cast<dfield_t*>(
			ut_malloc_nokey(n_uniq * sizeof *prev_fields));
		mtuple_heap = mem_heap_create(sizeof(mrec_buf_t));
	} else {
		prev_fields = NULL;
	}

	/* Scan the clustered index. */
	for (;;) {
		const rec_t*	rec;
		ulint*		offsets;
		const dtuple_t*	row;
		row_ext_t*	ext = NULL;
		page_cur_t*	cur	= btr_pcur_get_page_cur(&pcur);

		mem_heap_empty(row_heap);

		page_cur_move_to_next(cur);

		stage->n_pk_recs_inc();

		if (page_cur_is_after_last(cur)) {

			stage->inc();

			if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
				err = DB_INTERRUPTED;
				trx->error_key_num = 0;
				goto func_exit;
			}

			if (online && old_table != new_table) {
				err = row_log_table_get_error(clust_index);
				if (err != DB_SUCCESS) {
					trx->error_key_num = 0;
					goto func_exit;
				}
			}

#ifndef UNIV_DEBUG
# define dbug_run_purge	false
#else /* UNIV_DEBUG */
			bool	dbug_run_purge = false;
#endif /* UNIV_DEBUG */
			DBUG_EXECUTE_IF(
				"ib_purge_on_create_index_page_switch",
				dbug_run_purge = true;);

			/* Insert the cached spatial index rows. */
			bool	mtr_committed = false;

			err = row_merge_spatial_rows(
				trx->id, sp_tuples, num_spatial,
				row_heap, sp_heap, &pcur,
				&mtr, &mtr_committed);

			if (err != DB_SUCCESS) {
				goto func_exit;
			}

			if (mtr_committed) {
				goto scan_next;
			}

			if (dbug_run_purge
			    || rw_lock_get_waiters(
				    dict_index_get_lock(clust_index))) {
				/* There are waiters on the clustered
				index tree lock, likely the purge
				thread. Store and restore the cursor
				position, and yield so that scanning a
				large table will not starve other
				threads. */

				/* Store the cursor position on the last user
				record on the page. */
				btr_pcur_move_to_prev_on_page(&pcur);
				/* Leaf pages must never be empty, unless
				this is the only page in the index tree. */
				ut_ad(btr_pcur_is_on_user_rec(&pcur)
				      || btr_pcur_get_block(
					      &pcur)->page.id.page_no()
				      == clust_index->page);

				btr_pcur_store_position(&pcur, &mtr);
				mtr_commit(&mtr);

				if (dbug_run_purge) {
					/* This is for testing
					purposes only (see
					DBUG_EXECUTE_IF above).  We
					signal the purge thread and
					hope that the purge batch will
					complete before we execute
					btr_pcur_restore_position(). */
					trx_purge_run();
					os_thread_sleep(1000000);
				}

				/* Give the waiters a chance to proceed. */
				os_thread_yield();
scan_next:
				mtr_start(&mtr);
				/* Restore position on the record, or its
				predecessor if the record was purged
				meanwhile. */
				btr_pcur_restore_position(
					BTR_SEARCH_LEAF, &pcur, &mtr);
				/* Move to the successor of the
				original record. */
				if (!btr_pcur_move_to_next_user_rec(
					    &pcur, &mtr)) {
end_of_index:
					row = NULL;
					mtr_commit(&mtr);
					mem_heap_free(row_heap);
					ut_free(nonnull);
					goto write_buffers;
				}
			} else {
				page_no_t	next_page_no;
				buf_block_t*	block;

				next_page_no = btr_page_get_next(
					page_cur_get_page(cur), &mtr);

				if (next_page_no == FIL_NULL) {
					goto end_of_index;
				}

				block = page_cur_get_block(cur);
				block = btr_block_get(
					page_id_t(block->page.id.space(),
						  next_page_no),
					block->page.size,
					BTR_SEARCH_LEAF,
					clust_index, &mtr);

				btr_leaf_page_release(page_cur_get_block(cur),
						      BTR_SEARCH_LEAF, &mtr);
				page_cur_set_before_first(block, cur);
				page_cur_move_to_next(cur);

				ut_ad(!page_cur_is_after_last(cur));
			}
		}

		rec = page_cur_get_rec(cur);

		offsets = rec_get_offsets(rec, clust_index, NULL,
					  ULINT_UNDEFINED, &row_heap);

		if (online) {
			/* Perform a REPEATABLE READ.

			When rebuilding the table online,
			row_log_table_apply() must not see a newer
			state of the table when applying the log.
			This is mainly to prevent false duplicate key
			errors, because the log will identify records
			by the PRIMARY KEY, and also to prevent unsafe
			BLOB access.

			When creating a secondary index online, this
			table scan must not see records that have only
			been inserted to the clustered index, but have
			not been written to the online_log of
			index[]. If we performed READ UNCOMMITTED, it
			could happen that the ADD INDEX reaches
			ONLINE_INDEX_COMPLETE state between the time
			the DML thread has updated the clustered index
			but has not yet accessed secondary index. */
			ut_ad(MVCC::is_view_active(trx->read_view));

			if (!trx->read_view->changes_visible(
				    row_get_rec_trx_id(
					    rec, clust_index, offsets),
				    old_table->name)) {
				rec_t*	old_vers;

				row_vers_build_for_consistent_read(
					rec, &mtr, clust_index, &offsets,
					trx->read_view, &row_heap,
					row_heap, &old_vers, NULL);

				rec = old_vers;

				if (!rec) {
					continue;
				}
			}

			if (rec_get_deleted_flag(
				    rec,
				    dict_table_is_comp(old_table))) {
				/* This record was deleted in the latest
				committed version, or it was deleted and
				then reinserted-by-update before purge
				kicked in. Skip it. */
				continue;
			}

			ut_ad(!rec_offs_any_null_extern(rec, offsets));
		} else if (rec_get_deleted_flag(
				   rec, dict_table_is_comp(old_table))) {
			/* Skip delete-marked records.

			Skipping delete-marked records will make the
			created indexes unuseable for transactions
			whose read views were created before the index
			creation completed, but preserving the history
			would make it tricky to detect duplicate
			keys. */
			continue;
		}

		/* When !online, we are holding a lock on old_table, preventing
		any inserts that could have written a record 'stub' before
		writing out off-page columns. */
		ut_ad(!rec_offs_any_null_extern(rec, offsets));

		/* Build a row based on the clustered index. */

		row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index,
					   rec, offsets, new_table,
					   add_cols, add_v, col_map, &ext,
					   row_heap);
		ut_ad(row);

		for (ulint i = 0; i < n_nonnull; i++) {
			const dfield_t*	field	= &row->fields[nonnull[i]];

			ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL);

			if (dfield_is_null(field)) {
				err = DB_INVALID_NULL;
				trx->error_key_num = 0;
				goto func_exit;
			}
		}

		/* Get the next Doc ID */
		if (add_doc_id) {
			doc_id++;
		} else {
			doc_id = 0;
		}

		if (add_autoinc != ULINT_UNDEFINED) {

			ut_ad(add_autoinc
			      < new_table->get_n_user_cols());

			const dfield_t*	dfield;

			dfield = dtuple_get_nth_field(row, add_autoinc);
			if (dfield_is_null(dfield)) {
				goto write_buffers;
			}

			const dtype_t*  dtype = dfield_get_type(dfield);
			byte*	b = static_cast<byte*>(dfield_get_data(dfield));

			if (sequence.eof()) {
				err = DB_ERROR;
				trx->error_key_num = 0;

				ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
					ER_AUTOINC_READ_FAILED, "[NULL]");

				goto func_exit;
			}

			ulonglong	value = sequence++;

			switch (dtype_get_mtype(dtype)) {
			case DATA_INT: {
				ibool	usign;
				ulint	len = dfield_get_len(dfield);

				usign = dtype_get_prtype(dtype) & DATA_UNSIGNED;
				mach_write_ulonglong(b, value, len, usign);

				break;
				}

			case DATA_FLOAT:
				mach_float_write(
					b, static_cast<float>(value));
				break;

			case DATA_DOUBLE:
				mach_double_write(
					b, static_cast<double>(value));
				break;

			default:
				ut_ad(0);
			}
		}

write_buffers:
		/* Build all entries for all the indexes to be created
		in a single scan of the clustered index. */

		ulint	s_idx_cnt = 0;
		bool	skip_sort = skip_pk_sort
			&& merge_buf[0]->index->is_clustered();

		for (ulint i = 0; i < n_index; i++, skip_sort = false) {
			row_merge_buf_t*	buf	= merge_buf[i];
			merge_file_t*		file	= &files[i];
			ulint			rows_added = 0;

			if (dict_index_is_spatial(buf->index)) {
				if (!row) {
					continue;
				}

				ut_ad(sp_tuples[s_idx_cnt]->get_index()
				      == buf->index);

				/* If the geometry field is invalid, report
				error. */
				if (!row_geo_field_is_valid(row, buf->index)) {
					err = DB_CANT_CREATE_GEOMETRY_OBJECT;
					break;
				}

				sp_tuples[s_idx_cnt]->add(row, ext);
				s_idx_cnt++;

				continue;
			}

			if (UNIV_LIKELY
			    (row && (rows_added = row_merge_buf_add(
					buf, fts_index, old_table, new_table,
					psort_info, row, ext, &doc_id,
					conv_heap, &err,
					&v_heap, eval_table, trx)))) {

				/* If we are creating FTS index,
				a single row can generate more
				records for tokenized word */
				file->n_rec += rows_added;

				if (err != DB_SUCCESS) {
					ut_ad(err == DB_TOO_BIG_RECORD);
					break;
				}

				if (doc_id > max_doc_id) {
					max_doc_id = doc_id;
				}

				if (buf->index->type & DICT_FTS) {
					/* Check if error occurs in child thread */
					for (ulint j = 0;
					     j < fts_sort_pll_degree; j++) {
						if (psort_info[j].error
							!= DB_SUCCESS) {
							err = psort_info[j].error;
							trx->error_key_num = i;
							break;
						}
					}

					if (err != DB_SUCCESS) {
						break;
					}
				}

				if (skip_sort) {
					ut_ad(buf->n_tuples > 0);
					const mtuple_t*	curr =
						&buf->tuples[buf->n_tuples - 1];

					ut_ad(i == 0);
					ut_ad(merge_buf[0]->index->is_clustered());
					/* Detect duplicates by comparing the
					current record with previous record.
					When temp file is not used, records
					should be in sorted order. */
					if (prev_mtuple.fields != NULL
					    && (row_mtuple_cmp(
						&prev_mtuple, curr,
						&clust_dup) == 0)) {

						err = DB_DUPLICATE_KEY;
						trx->error_key_num
							= key_numbers[0];
						goto func_exit;
					}

					prev_mtuple.fields = curr->fields;
				}

				continue;
			}

			if (err == DB_COMPUTE_VALUE_FAILED) {
				trx->error_key_num = i;
				goto func_exit;
			}

			if (buf->index->type & DICT_FTS) {
				if (!row || !doc_id) {
					continue;
				}
			}

			/* The buffer must be sufficiently large
			to hold at least one record. It may only
			be empty when we reach the end of the
			clustered index. row_merge_buf_add()
			must not have been called in this loop. */
			ut_ad(buf->n_tuples || row == NULL);

			/* We have enough data tuples to form a block.
			Sort them and write to disk if temp file is used
			or insert into index if temp file is not used. */
			ut_ad(old_table == new_table
			      ? !buf->index->is_clustered()
			      : (i == 0) == buf->index->is_clustered());

			/* We have enough data tuples to form a block.
			Sort them (if !skip_sort) and write to disk. */

			if (buf->n_tuples) {
				if (skip_sort) {
					/* Temporary File is not used.
					so insert sorted block to the index */
					if (row != NULL) {
						bool	mtr_committed = false;

						/* We have to do insert the
						cached spatial index rows, since
						after the mtr_commit, the cluster
						index page could be updated, then
						the data in cached rows become
						invalid. */
						err = row_merge_spatial_rows(
							trx->id, sp_tuples,
							num_spatial,
							row_heap, sp_heap,
							&pcur, &mtr,
							&mtr_committed);

						if (err != DB_SUCCESS) {
							goto func_exit;
						}

						/* We are not at the end of
						the scan yet. We must
						mtr_commit() in order to be
						able to call log_free_check()
						in row_merge_insert_index_tuples().
						Due to mtr_commit(), the
						current row will be invalid, and
						we must reread it on the next
						loop iteration. */
						if (!mtr_committed) {
							btr_pcur_move_to_prev_on_page(
								&pcur);
							btr_pcur_store_position(
								&pcur, &mtr);

							mtr_commit(&mtr);
						}
					}

					mem_heap_empty(mtuple_heap);
					prev_mtuple.fields = prev_fields;

					row_mtuple_create(
						&buf->tuples[buf->n_tuples - 1],
						&prev_mtuple, n_uniq,
						mtuple_heap);

					if (clust_btr_bulk == NULL) {
						clust_btr_bulk = UT_NEW_NOKEY(
							BtrBulk(index[i],
								trx->id,
								observer));

						clust_btr_bulk->init();
					} else {
						clust_btr_bulk->latch();
					}

					err = row_merge_insert_index_tuples(
						trx->id, index[i], old_table,
						-1, NULL, buf, clust_btr_bulk);

					if (row == NULL) {
						err = clust_btr_bulk->finish(
							err);
						UT_DELETE(clust_btr_bulk);
						clust_btr_bulk = NULL;
					} else {
						/* Release latches for possible
						log_free_chck in spatial index
						build. */
						clust_btr_bulk->release();
					}

					if (err != DB_SUCCESS) {
						break;
					}

					if (row != NULL) {
						/* Restore the cursor on the
						previous clustered index record,
						and empty the buffer. The next
						iteration of the outer loop will
						advance the cursor and read the
						next record (the one which we
						had to ignore due to the buffer
						overflow). */
						mtr_start(&mtr);
						btr_pcur_restore_position(
							BTR_SEARCH_LEAF, &pcur,
							&mtr);
						buf = row_merge_buf_empty(buf);
						/* Restart the outer loop on the
						record. We did not insert it
						into any index yet. */
						ut_ad(i == 0);
						break;
					}
				} else if (dict_index_is_unique(buf->index)) {
					row_merge_dup_t	dup = {
						buf->index, table, col_map, 0};

					row_merge_buf_sort(buf, &dup);

					if (dup.n_dup) {
						err = DB_DUPLICATE_KEY;
						trx->error_key_num
							= key_numbers[i];
						break;
					}
				} else {
					row_merge_buf_sort(buf, NULL);
				}
			} else if (online && new_table == old_table) {
				/* Note the newest transaction that
				modified this index when the scan was
				completed. We prevent older readers
				from accessing this index, to ensure
				read consistency. */

				trx_id_t	max_trx_id;

				ut_a(row == NULL);
				rw_lock_x_lock(
					dict_index_get_lock(buf->index));
				ut_a(dict_index_get_online_status(buf->index)
				     == ONLINE_INDEX_CREATION);

				max_trx_id = row_log_get_max_trx(buf->index);

				if (max_trx_id > buf->index->trx_id) {
					buf->index->trx_id = max_trx_id;
				}

				rw_lock_x_unlock(
					dict_index_get_lock(buf->index));
			}

			/* Secondary index and clustered index which is
			not in sorted order can use the temporary file.
			Fulltext index should not use the temporary file. */
			if (!skip_sort && !(buf->index->type & DICT_FTS)) {
				/* In case we can have all rows in sort buffer,
				we can insert directly into the index without
				temporary file if clustered index does not uses
				temporary file. */
				if (row == NULL && file->fd == -1
				    && !clust_temp_file) {
					DBUG_EXECUTE_IF(
						"row_merge_write_failure",
						err = DB_TEMP_FILE_WRITE_FAIL;
						trx->error_key_num = i;
						goto all_done;);

					DBUG_EXECUTE_IF(
						"row_merge_tmpfile_fail",
						err = DB_OUT_OF_MEMORY;
						trx->error_key_num = i;
						goto all_done;);

					BtrBulk	btr_bulk(index[i], trx->id,
							 observer);
					btr_bulk.init();

					err = row_merge_insert_index_tuples(
						trx->id, index[i], old_table,
						-1, NULL, buf, &btr_bulk);

					err = btr_bulk.finish(err);

					DBUG_EXECUTE_IF(
						"row_merge_insert_big_row",
						err = DB_TOO_BIG_RECORD;);

					if (err != DB_SUCCESS) {
						break;
					}
				} else {
					if (row_merge_file_create_if_needed(
						file, tmpfd,
						buf->n_tuples, path) < 0) {
						err = DB_OUT_OF_MEMORY;
						trx->error_key_num = i;
						goto func_exit;
					}

					/* Ensure that duplicates in the
					clustered index will be detected before
					inserting secondary index records. */
					if (buf->index->is_clustered()) {
						clust_temp_file = true;
					}

					ut_ad(file->n_rec > 0);

					row_merge_buf_write(buf, file, block);

					if (!row_merge_write(
						    file->fd, file->offset++,
						    block)) {
						err = DB_TEMP_FILE_WRITE_FAIL;
						trx->error_key_num = i;
						break;
					}

					UNIV_MEM_INVALID(
						&block[0], srv_sort_buf_size);
				}
			}
			merge_buf[i] = row_merge_buf_empty(buf);

			if (UNIV_LIKELY(row != NULL)) {
				/* Try writing the record again, now
				that the buffer has been written out
				and emptied. */

				if (UNIV_UNLIKELY
				    (!(rows_added = row_merge_buf_add(
						buf, fts_index, old_table,
						new_table, psort_info, row, ext,
						&doc_id, conv_heap,
						&err, &v_heap, table, trx)))) {
					/* An empty buffer should have enough
					room for at least one record. */
					ut_error;
				}

				if (err != DB_SUCCESS) {
					break;
				}

				file->n_rec += rows_added;
			}
		}

		if (row == NULL) {
			goto all_done;
		}

		if (err != DB_SUCCESS) {
			goto func_exit;
		}

		if (v_heap) {
			mem_heap_empty(v_heap);
		}
	}

func_exit:
	/* row_merge_spatial_rows may have committed
	the mtr	before an error occurs. */
	if (mtr.is_active()) {
		mtr_commit(&mtr);
	}
	mem_heap_free(row_heap);
	ut_free(nonnull);

all_done:
	if (clust_btr_bulk != NULL) {
		ut_ad(err != DB_SUCCESS);
		clust_btr_bulk->latch();
		err = clust_btr_bulk->finish(
			err);
		UT_DELETE(clust_btr_bulk);
	}

	if (prev_fields != NULL) {
		ut_free(prev_fields);
		mem_heap_free(mtuple_heap);
	}

	if (v_heap) {
		mem_heap_free(v_heap);
	}

	if (conv_heap != NULL) {
		mem_heap_free(conv_heap);
	}

#ifdef FTS_INTERNAL_DIAG_PRINT
	DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
#endif
	if (fts_pll_sort) {
		bool	all_exit = false;
		ulint	trial_count = 0;
		const ulint max_trial_count = 10000;

wait_again:
                /* Check if error occurs in child thread */
		for (ulint j = 0; j < fts_sort_pll_degree; j++) {
			if (psort_info[j].error != DB_SUCCESS) {
				err = psort_info[j].error;
				trx->error_key_num = j;
				break;
			}
		}

		/* Tell all children that parent has done scanning */
		for (ulint i = 0; i < fts_sort_pll_degree; i++) {
			if (err == DB_SUCCESS) {
				psort_info[i].state = FTS_PARENT_COMPLETE;
			} else {
				psort_info[i].state = FTS_PARENT_EXITING;
			}
		}

		/* Now wait all children to report back to be completed */
		os_event_wait_time_low(fts_parallel_sort_event,
				       1000000, sig_count);

		for (ulint i = 0; i < fts_sort_pll_degree; i++) {
			if (psort_info[i].child_status != FTS_CHILD_COMPLETE
			    && psort_info[i].child_status != FTS_CHILD_EXITING) {
				sig_count = os_event_reset(
					fts_parallel_sort_event);
				goto wait_again;
			}
		}

		/* Now all children should complete, wait a bit until
		they all finish setting the event, before we free everything.
		This has a 10 second timeout */
		do {
			all_exit = true;

			for (ulint j = 0; j < fts_sort_pll_degree; j++) {
				if (psort_info[j].child_status
				    != FTS_CHILD_EXITING) {
					all_exit = false;
					os_thread_sleep(1000);
					break;
				}
			}
			trial_count++;
		} while (!all_exit && trial_count < max_trial_count);

		if (!all_exit) {
			ib::fatal() << "Not all child sort threads exited"
				" when creating FTS index '"
				<< fts_sort_idx->name << "'";
		}
	}

#ifdef FTS_INTERNAL_DIAG_PRINT
	DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n");
#endif
	for (ulint i = 0; i < n_index; i++) {
		row_merge_buf_free(merge_buf[i]);
	}

	row_fts_free_pll_merge_buf(psort_info);

	ut_free(merge_buf);

	btr_pcur_close(&pcur);

	if (sp_tuples != NULL) {
		for (ulint i = 0; i < num_spatial; i++) {
			UT_DELETE(sp_tuples[i]);
		}
		ut_free(sp_tuples);

		if (sp_heap) {
			mem_heap_free(sp_heap);
		}
	}

	/* Update the next Doc ID we used. Table should be locked, so
	no concurrent DML */
	if (max_doc_id && err == DB_SUCCESS) {
		/* Sync fts cache for other fts indexes to keep all
		fts indexes consistent in sync_doc_id. */
		err = fts_sync_table(const_cast<dict_table_t*>(new_table),
				     false, true, false);

		if (err == DB_SUCCESS) {
			fts_update_next_doc_id(
				0, new_table,
				old_table->name.m_name, max_doc_id);
		}
	}

	trx->op_info = "";

	DBUG_RETURN(err);
}