in storage/myisam/mi_check.c [2670:3169]
int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
const char * name, int rep_quick, my_bool no_copy_stat)
{
int got_error;
uint i,key, total_key_length, istep;
ulong rec_length;
ha_rows start_records;
my_off_t new_header_length,del;
File new_file;
MI_SORT_PARAM *sort_param=0;
MYISAM_SHARE *share=info->s;
ulong *rec_per_key_part;
HA_KEYSEG *keyseg;
char llbuff[22];
IO_CACHE new_data_cache; /* For non-quick repair. */
IO_CACHE_SHARE io_share;
SORT_INFO sort_info;
ulonglong UNINIT_VAR(key_map);
pthread_attr_t thr_attr;
ulong max_pack_reclength;
int error;
DBUG_ENTER("mi_repair_parallel");
start_records=info->state->records;
got_error=1;
new_file= -1;
new_header_length=(param->testflag & T_UNPACK) ? 0 :
share->pack.header_length;
if (!(param->testflag & T_SILENT))
{
printf("- parallel recovering (with sort) MyISAM-table '%s'\n",name);
printf("Data records: %s\n", llstr(start_records,llbuff));
}
param->testflag|=T_REP; /* for easy checking */
if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
param->testflag|=T_CALC_CHECKSUM;
/*
Quick repair (not touching data file, rebuilding indexes):
{
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
}
Non-quick repair (rebuilding data file and indexes):
{
Master thread:
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
Write cache is (MI_INFO *info)->rec_cache using new_file.
Slave threads:
Read cache is new_data_cache synced to master rec_cache.
The final assignment of the filedescriptor for rec_cache is done
after the cache creation.
Don't check file size on new_data_cache, as the resulting file size
is not known yet.
As rec_cache and new_data_cache are synced, write_buffer_length is
used for the read cache 'new_data_cache'. Both start at the same
position 'new_header_length'.
}
*/
DBUG_PRINT("info", ("is quick repair: %d", rep_quick));
memset(&sort_info, 0, sizeof(sort_info));
/* Initialize pthread structures before goto err. */
mysql_mutex_init(mi_key_mutex_MI_SORT_INFO_mutex,
&sort_info.mutex, MY_MUTEX_INIT_FAST);
mysql_cond_init(mi_key_cond_MI_SORT_INFO_cond, &sort_info.cond, 0);
mysql_mutex_init(mi_key_mutex_MI_CHECK_print_msg,
¶m->print_msg_mutex, MY_MUTEX_INIT_FAST);
param->need_print_msg_lock= 1;
if (!(sort_info.key_block=
alloc_key_blocks(param, (uint) param->sort_key_blocks,
share->base.max_key_block_length)) ||
init_io_cache(¶m->read_cache, info->dfile,
(uint) param->read_buffer_length,
READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)) ||
(!rep_quick &&
(init_io_cache(&info->rec_cache, info->dfile,
(uint) param->write_buffer_length,
WRITE_CACHE, new_header_length, 1,
MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw) ||
init_io_cache(&new_data_cache, -1,
(uint) param->write_buffer_length,
READ_CACHE, new_header_length, 1,
MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))))
goto err;
sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
info->opt_flag|=WRITE_CACHE_USED;
info->rec_cache.file=info->dfile; /* for sort_delete_record */
if (!rep_quick)
{
/* Get real path for data file */
if ((new_file= mysql_file_create(mi_key_file_datatmp,
fn_format(param->temp_filename,
share->data_file_name, "",
DATA_TMP_EXT, 2+4),
0, param->tmpfile_createflag,
MYF(0))) < 0)
{
mi_check_print_error(param,"Can't create new tempfile: '%s'",
param->temp_filename);
goto err;
}
if (new_header_length &&
filecopy(param, new_file,info->dfile,0L,new_header_length,
"datafile-header"))
goto err;
if (param->testflag & T_UNPACK)
{
share->options&= ~HA_OPTION_COMPRESS_RECORD;
mi_int2store(share->state.header.options,share->options);
}
share->state.dellink= HA_OFFSET_ERROR;
info->rec_cache.file=new_file;
}
info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
/* Optionally drop indexes and optionally modify the key_map. */
mi_drop_all_indexes(param, info, FALSE);
key_map= share->state.key_map;
if (param->testflag & T_CREATE_MISSING_KEYS)
{
/* Invert the copied key_map to recreate all disabled indexes. */
key_map= ~key_map;
}
sort_info.info=info;
sort_info.param = param;
set_data_file_type(&sort_info, share);
sort_info.dupp=0;
sort_info.buff=0;
param->read_cache.end_of_file=sort_info.filelength=
mysql_file_seek(param->read_cache.file, 0L, MY_SEEK_END, MYF(0));
if (share->data_file_type == DYNAMIC_RECORD)
rec_length= MY_MAX(share->base.min_pack_length + 1, share->base.min_block_length);
else if (share->data_file_type == COMPRESSED_RECORD)
rec_length=share->base.min_block_length;
else
rec_length=share->base.pack_reclength;
/*
+1 below is required hack for parallel repair mode.
The info->state->records value, that is compared later
to sort_info.max_records and cannot exceed it, is
increased in sort_key_write. In mi_repair_by_sort, sort_key_write
is called after sort_key_read, where the comparison is performed,
but in parallel mode master thread can call sort_key_write
before some other repair thread calls sort_key_read.
Furthermore I'm not even sure +1 would be enough.
May be sort_info.max_records shold be always set to max value in
parallel mode.
*/
sort_info.max_records=
((param->testflag & T_CREATE_MISSING_KEYS) ? info->state->records + 1:
(ha_rows) (sort_info.filelength/rec_length+1));
del=info->state->del;
param->glob_crc=0;
/* for compressed tables */
max_pack_reclength= share->base.pack_reclength;
if (share->options & HA_OPTION_COMPRESS_RECORD)
set_if_bigger(max_pack_reclength, share->max_pack_length);
if (!(sort_param=(MI_SORT_PARAM *)
my_malloc((uint) share->base.keys *
(sizeof(MI_SORT_PARAM) + max_pack_reclength),
MYF(MY_ZEROFILL))))
{
mi_check_print_error(param,"Not enough memory for key!");
goto err;
}
total_key_length=0;
rec_per_key_part= param->rec_per_key_part;
info->state->records=info->state->del=share->state.split=0;
info->state->empty=0;
for (i=key=0, istep=1 ; key < share->base.keys ;
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
{
sort_param[i].key=key;
sort_param[i].keyinfo=share->keyinfo+key;
sort_param[i].seg=sort_param[i].keyinfo->seg;
/*
Skip this index if it is marked disabled in the copied
(and possibly inverted) key_map.
*/
if (! mi_is_key_active(key_map, key))
{
/* Remember old statistics for key */
memcpy((char*) rec_per_key_part,
(char*) (share->state.rec_per_key_part+
(uint) (rec_per_key_part - param->rec_per_key_part)),
sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
istep=0;
continue;
}
istep=1;
if ((!(param->testflag & T_SILENT)))
printf ("- Fixing index %d\n",key+1);
if (sort_param[i].keyinfo->flag & HA_FULLTEXT)
{
sort_param[i].key_read=sort_ft_key_read;
sort_param[i].key_write=sort_ft_key_write;
}
else
{
sort_param[i].key_read=sort_key_read;
sort_param[i].key_write=sort_key_write;
}
sort_param[i].key_cmp=sort_key_cmp;
sort_param[i].lock_in_memory=lock_memory;
sort_param[i].tmpdir=param->tmpdir;
sort_param[i].sort_info=&sort_info;
sort_param[i].master=0;
sort_param[i].fix_datafile=0;
sort_param[i].calc_checksum= 0;
sort_param[i].filepos=new_header_length;
sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length;
sort_param[i].record= (((uchar *)(sort_param+share->base.keys))+
(max_pack_reclength * i));
if (!mi_alloc_rec_buff(info, -1, &sort_param[i].rec_buff))
{
mi_check_print_error(param,"Not enough memory!");
goto err;
}
sort_param[i].key_length=share->rec_reflength;
for (keyseg=sort_param[i].seg; keyseg->type != HA_KEYTYPE_END;
keyseg++)
{
sort_param[i].key_length+=keyseg->length;
if (keyseg->flag & HA_SPACE_PACK)
sort_param[i].key_length+=get_pack_length(keyseg->length);
if (keyseg->flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
sort_param[i].key_length+=2 + MY_TEST(keyseg->length >= 127);
if (keyseg->flag & HA_NULL_PART)
sort_param[i].key_length++;
}
total_key_length+=sort_param[i].key_length;
if (sort_param[i].keyinfo->flag & HA_FULLTEXT)
{
uint ft_max_word_len_for_sort=FT_MAX_WORD_LEN_FOR_SORT*
sort_param[i].keyinfo->seg->charset->mbmaxlen;
sort_param[i].key_length+=ft_max_word_len_for_sort-HA_FT_MAXBYTELEN;
init_alloc_root(&sort_param[i].wordroot, FTPARSER_MEMROOT_ALLOC_SIZE, 0);
}
}
sort_info.total_keys=i;
sort_param[0].master= 1;
sort_param[0].fix_datafile= (my_bool)(! rep_quick);
sort_param[0].calc_checksum= MY_TEST(param->testflag & T_CALC_CHECKSUM);
if (!ftparser_alloc_param(info))
goto err;
sort_info.got_error=0;
mysql_mutex_lock(&sort_info.mutex);
/*
Initialize the I/O cache share for use with the read caches and, in
case of non-quick repair, the write cache. When all threads join on
the cache lock, the writer copies the write cache contents to the
read caches.
*/
if (i > 1)
{
if (rep_quick)
init_io_cache_share(¶m->read_cache, &io_share, NULL, i);
else
init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
}
else
io_share.total_threads= 0; /* share not used */
(void) pthread_attr_init(&thr_attr);
(void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
for (i=0 ; i < sort_info.total_keys ; i++)
{
/*
Copy the properly initialized IO_CACHE structure so that every
thread has its own copy. In quick mode param->read_cache is shared
for use by all threads. In non-quick mode all threads but the
first copy the shared new_data_cache, which is synchronized to the
write cache of the first thread. The first thread copies
param->read_cache, which is not shared.
*/
sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache :
new_data_cache);
DBUG_PRINT("io_cache_share", ("thread: %u read_cache: 0x%lx",
i, (long) &sort_param[i].read_cache));
/*
two approaches: the same amount of memory for each thread
or the memory for the same number of keys for each thread...
In the second one all the threads will fill their sort_buffers
(and call write_keys) at the same time, putting more stress on i/o.
*/
sort_param[i].sortbuff_size=
#ifndef USING_SECOND_APPROACH
param->sort_buffer_length/sort_info.total_keys;
#else
param->sort_buffer_length*sort_param[i].key_length/total_key_length;
#endif
if ((error= mysql_thread_create(mi_key_thread_find_all_keys,
&sort_param[i].thr, &thr_attr,
thr_find_all_keys,
(void *) (sort_param+i))))
{
mi_check_print_error(param,"Cannot start a repair thread (errno= %d)",
error);
/* Cleanup: Detach from the share. Avoid others to be blocked. */
if (io_share.total_threads)
remove_io_thread(&sort_param[i].read_cache);
DBUG_PRINT("error", ("Cannot start a repair thread"));
sort_info.got_error=1;
}
else
sort_info.threads_running++;
}
(void) pthread_attr_destroy(&thr_attr);
/* waiting for all threads to finish */
while (sort_info.threads_running)
mysql_cond_wait(&sort_info.cond, &sort_info.mutex);
mysql_mutex_unlock(&sort_info.mutex);
if ((got_error= thr_write_keys(sort_param)))
{
param->retry_repair=1;
goto err;
}
got_error=1; /* Assume the following may go wrong */
if (sort_param[0].fix_datafile)
{
/*
Append some nuls to the end of a memory mapped file. Destroy the
write cache. The master thread did already detach from the share
by remove_io_thread() in sort.c:thr_find_all_keys().
*/
if (write_data_suffix(&sort_info,1) || end_io_cache(&info->rec_cache))
goto err;
if (param->testflag & T_SAFE_REPAIR)
{
/* Don't repair if we loosed more than one row */
if (info->state->records+1 < start_records)
{
info->state->records=start_records;
goto err;
}
}
share->state.state.data_file_length= info->state->data_file_length=
sort_param->filepos;
/* Only whole records */
share->state.version=(ulong) time((time_t*) 0);
/*
Exchange the data file descriptor of the table, so that we use the
new file from now on.
*/
mysql_file_close(info->dfile, MYF(0));
info->dfile=new_file;
share->data_file_type=sort_info.new_data_file_type;
share->pack.header_length=(ulong) new_header_length;
}
else
info->state->data_file_length=sort_param->max_pos;
if (rep_quick && del+sort_info.dupp != info->state->del)
{
mi_check_print_error(param,"Couldn't fix table with quick recovery: Found wrong number of deleted records");
mi_check_print_error(param,"Run recovery again without -q");
param->retry_repair=1;
param->testflag|=T_RETRY_WITHOUT_QUICK;
goto err;
}
if (rep_quick & T_FORCE_UNIQUENESS)
{
my_off_t skr=info->state->data_file_length+
(share->options & HA_OPTION_COMPRESS_RECORD ?
MEMMAP_EXTRA_MARGIN : 0);
#ifdef USE_RELOC
if (share->data_file_type == STATIC_RECORD &&
skr < share->base.reloc*share->base.min_pack_length)
skr=share->base.reloc*share->base.min_pack_length;
#endif
if (skr != sort_info.filelength)
if (mysql_file_chsize(info->dfile, skr, 0, MYF(0)))
mi_check_print_warning(param,
"Can't change size of datafile, error: %d",
my_errno);
}
if (param->testflag & T_CALC_CHECKSUM)
info->state->checksum=param->glob_crc;
if (mysql_file_chsize(share->kfile, info->state->key_file_length, 0, MYF(0)))
mi_check_print_warning(param,
"Can't change size of indexfile, error: %d", my_errno);
if (!(param->testflag & T_SILENT))
{
if (start_records != info->state->records)
printf("Data records: %s\n", llstr(info->state->records,llbuff));
if (sort_info.dupp)
mi_check_print_warning(param,
"%s records have been removed",
llstr(sort_info.dupp,llbuff));
}
if ((got_error = mi_notify_file_length_change(info)))
goto err;
if (&share->state.state != info->state)
memcpy(&share->state.state, info->state, sizeof(*info->state));
err:
got_error|= flush_blocks(param, share->key_cache, share->kfile);
/*
Destroy the write cache. The master thread did already detach from
the share by remove_io_thread() or it was not yet started (if the
error happend before creating the thread).
*/
(void) end_io_cache(&info->rec_cache);
/*
Destroy the new data cache in case of non-quick repair. All slave
threads did either detach from the share by remove_io_thread()
already or they were not yet started (if the error happend before
creating the threads).
*/
if (!rep_quick)
(void) end_io_cache(&new_data_cache);
if (!got_error)
{
/* Replace the actual file with the temporary file */
if (new_file >= 0)
{
myf flags= 0;
if (param->testflag & T_BACKUP_DATA)
flags |= MY_REDEL_MAKE_BACKUP;
if (no_copy_stat)
flags |= MY_REDEL_NO_COPY_STAT;
mysql_file_close(new_file, MYF(0));
info->dfile=new_file= -1;
if (change_to_newfile(share->data_file_name, MI_NAME_DEXT, DATA_TMP_EXT,
flags) ||
mi_open_datafile(info,share,name,-1))
got_error=1;
}
}
if (got_error)
{
if (! param->error_printed)
mi_check_print_error(param,"%d when fixing table",my_errno);
if (new_file >= 0)
{
(void) mysql_file_close(new_file, MYF(0));
(void) mysql_file_delete(mi_key_file_datatmp,
param->temp_filename, MYF(MY_WME));
if (info->dfile == new_file) /* Retry with key cache */
if (unlikely(mi_open_datafile(info, share, name, -1)))
param->retry_repair= 0; /* Safety */
}
mi_mark_crashed_on_repair(info);
}
else if (key_map == share->state.key_map)
share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS;
share->state.changed|=STATE_NOT_SORTED_PAGES;
mysql_cond_destroy(&sort_info.cond);
mysql_mutex_destroy(&sort_info.mutex);
mysql_mutex_destroy(¶m->print_msg_mutex);
param->need_print_msg_lock= 0;
my_free(sort_info.ft_buf);
my_free(sort_info.key_block);
my_free(sort_param);
my_free(sort_info.buff);
(void) end_io_cache(¶m->read_cache);
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
if (!got_error && (param->testflag & T_UNPACK))
{
share->state.header.options[0]&= (uchar) ~HA_OPTION_COMPRESS_RECORD;
share->pack.header_length=0;
}
DBUG_RETURN(got_error);
}