mrs_plugin/db_schema/mysql_tasks.msm.project/development/mysql_tasks_next.sql (1,928 lines of code) (raw):
/*
* Copyright (c) 2021, 2025, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0,
* as published by the Free Software Foundation.
*
* This program is designed to work with certain software (including
* but not limited to OpenSSL) that is licensed under separate terms, as
* designated in a particular file or component or in included license
* documentation. The authors of MySQL hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have either included with
* the program or referenced in the documentation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
-- #############################################################################
-- MSM Section 000: Database Schema Development Script
-- -----------------------------------------------------------------------------
-- This script contains the current development version of the database schema
-- `mysql_tasks`
-- #############################################################################
-- #############################################################################
-- MSM Section 010: Server Variable Settings
-- -----------------------------------------------------------------------------
-- Set server variables, remember their state to be able to restore accordingly.
-- #############################################################################
SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0;
SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0;
SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,'
'NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,'
'NO_ENGINE_SUBSTITUTION';
-- #############################################################################
-- MSM Section 110: Database Schema Creation
-- -----------------------------------------------------------------------------
-- CREATE SCHEMA statement.
-- #############################################################################
CREATE SCHEMA IF NOT EXISTS `mysql_tasks`
DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci;
-- #############################################################################
-- MSM Section 120: Database Schema Version Creation Indication
-- -----------------------------------------------------------------------------
-- Create the `${schema_name}`.`msm_schema_version` VIEW and initialize it with
-- the version 0, 0, 0 which indicates the ongoing creation processes of the
-- `${schema_name}` database schema.
-- #############################################################################
CREATE OR REPLACE SQL SECURITY INVOKER
VIEW `mysql_tasks`.`msm_schema_version` (
`major`,`minor`,`patch`) AS
SELECT 0, 0, 0;
-- #############################################################################
-- MSM Section 140: Non-idempotent Schema Objects
-- -----------------------------------------------------------------------------
-- This section contains creation of schema TABLEs and the initialization of
-- base data (standard INSERTs). It is important to note that all other
-- schema objects (VIEWs, PROCEDUREs, FUNCTIONs, TRIGGERs EVENTs, ...) need to
-- be created inside the MSM Section 150: idempotent schema object changes.
-- ROLEs and GRANTs are defined in the MSM Section 170: Authorization.
-- -----------------------------------------------------------------------------
-- CREATE TABLE statements and standard INSERTs.
-- #############################################################################
-- -----------------------------------------------------
-- Table `mysql_tasks`.`config`
-- for internal use
-- -----------------------------------------------------
CREATE TABLE IF NOT EXISTS `mysql_tasks`.`config` (
`id` TINYINT NOT NULL DEFAULT 1,
`data` JSON NULL,
PRIMARY KEY (`id`),
CONSTRAINT Config_OnlyOneRow CHECK (id = 1)
) ENGINE = InnoDB;
INSERT IGNORE INTO `mysql_tasks`.`config` (`id`, `data`)
VALUES (1, '{
"limits": {
"maximumPreparedStmtAsyncTasks": 100,
"maximumHeatwaveLoadingTasks": 5
}
}'
);
-- -----------------------------------------------------
-- Table `mysql_tasks`.`task_impl`
-- for internal use
-- -----------------------------------------------------
CREATE TABLE IF NOT EXISTS `mysql_tasks`.`task_impl` (
`id` BINARY(16) NOT NULL COMMENT 'A UUID uniquely identifying the task across
replication instances in binary format. The id should be created by the
function call UUID_TO_BIN(UUID(), 1) which generates the BINARY
representation of the UUID in reverse order to improve indexing.
The field is usually hidden from end users.',
`mysql_user` VARCHAR(288) DEFAULT (CURRENT_USER()) COMMENT
'The MySQL user that created the task.',
`app_user_id` VARCHAR(255) COMMENT 'An optional ID representing a specific
application user. If set, the app_user_id will be used to filter tasks
per application users, preventing an app user to see tasks from other
app users.',
`alias` VARCHAR(16) COMMENT 'A human readable alias that allows easier
referencing of a specific task. It uses the format
{Abbreviated weekday name}-{task count per mysql_user or app_user_id if
specified}, e.g. Mon-1, Mon-2, Tue-1, etc. Please note that there is
no guarantee that the alias will be unique, while still being useful in
the majority of cases as old task are deleted after 6 days.',
`name` VARCHAR(255) NOT NULL COMMENT 'The name of the task.',
`server_uuid` BINARY(16) NOT NULL COMMENT 'The UUID of the server on which
the task has been created. It should be populated using
UUID_TO_BIN(@@server_uuid, 1).',
`connection_id` BIGINT UNSIGNED NOT NULL COMMENT 'The MySQL server
connection_id that was used to created the task.',
`task_type` VARCHAR(80) NOT NULL COMMENT 'An application defined task type,
used for filtering of tasks per type.',
`data` JSON COMMENT 'Can hold application specific data.',
`data_json_schema` JSON COMMENT 'A JSON schema defining the structure of the
data field for the given task.',
`log_data_json_schema` JSON COMMENT 'A JSON schema defining the structure of
the task_log_impl.data field for the given task.',
PRIMARY KEY(`id`),
INDEX(`mysql_user`(192)),
INDEX(`mysql_user`(176), `alias`),
INDEX(`task_type`)
) ENGINE=InnoDB;
-- -----------------------------------------------------
-- Table `mysql_tasks`.`task_log_impl`
-- for internal use
-- -----------------------------------------------------
CREATE TABLE IF NOT EXISTS `mysql_tasks`.`task_log_impl` (
`id` BINARY(16) NOT NULL COMMENT 'A UUID uniquely identifying a task log
entry. It should be created by the function call UUID_TO_BIN(UUID(), 1)
which generates the BINARY representation of the UUID in the reverse
order to improve indexing.',
`task_id` BINARY(16) NOT NULL COMMENT 'The task ID (foreign key).',
`mysql_user` VARCHAR(288) DEFAULT (CURRENT_USER()) COMMENT 'The MySQL user
that created the task / inserted the task log.',
`log_time` TIMESTAMP(6) NOT NULL COMMENT 'A timestamp when the task log entry
was inserted.',
`message` VARCHAR(2000) COMMENT 'A task log message.',
`data` JSON COMMENT 'Can hold application specific log data. It must conform
to log_data_json_schema defined in the`task_impl` table.',
`progress` SMALLINT NOT NULL DEFAULT 0 COMMENT 'A task completion progress
between 0 and 100%.',
`status` ENUM('SCHEDULED', 'RUNNING', 'COMPLETED', 'ERROR', 'CANCELLED')
NOT NULL DEFAULT 'SCHEDULED' COMMENT 'The task state. When created, a task
goes in the SCHEDULED state, then is moved to RUNNING and finally
COMPLETED state. In case of ERROR, the task status becomes ERROR. When
task is killed by the user or by the garbage collector, it gets the
CANCELLED status.',
PRIMARY KEY(`id`),
INDEX(`mysql_user`(192)),
INDEX(`log_time`),
INDEX(`status`),
CONSTRAINT `fk_task_log_task_id`
FOREIGN KEY (`task_id`)
REFERENCES `mysql_tasks`.`task_impl` (`id`)
) ENGINE=InnoDB;
-- #############################################################################
-- MSM Section 150: Idempotent Schema Objects
-- -----------------------------------------------------------------------------
-- This section contains the creation of all schema objects except TABLEs.
-- All objects must be created under the assumption that an object of the same
-- name is already present. Therefore explicit DROP IF EXISTS statements or
-- CREATE OR REPLACE clauses must be used when creating the objects.
-- -----------------------------------------------------------------------------
-- All other schema object definitions (VIEWS, PROCEDUREs, FUNCTIONs, TRIGGERs,
-- EVENTS, ...).
-- #############################################################################
DELIMITER %%
-- -----------------------------------------------------
-- Trigger `mysql_tasks`.`config_prevent_deletion`
-- Prevents deleting the only row in the config table
-- -----------------------------------------------------
DROP TRIGGER IF EXISTS `mysql_tasks`.`config_prevent_deletion`%%
CREATE TRIGGER `mysql_tasks`.`config_prevent_deletion`
BEFORE DELETE ON `mysql_tasks`.`config`
FOR EACH ROW
BEGIN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Deletion not allowed, please update the only row.';
END%%
-- -------------------------------------------------------------------------
-- Trigger `mysql_tasks`.`config_BEFORE_UPDATE`
-- Ensures 'maximumHeatwaveLoadingTasks' <= 'maximumPreparedStmtAsyncTasks'
-- -------------------------------------------------------------------------
DROP TRIGGER IF EXISTS `mysql_tasks`.`config_BEFORE_UPDATE`%%
CREATE TRIGGER `mysql_tasks`.`config_BEFORE_UPDATE`
BEFORE UPDATE ON `mysql_tasks`.`config`
FOR EACH ROW
BEGIN
IF NOT JSON_SCHEMA_VALID('{
"type": "object",
"properties": {
"limits": {
"type": "object",
"properties": {
"maximumPreparedStmtAsyncTasks": {
"type": "integer",
"minimum": 1,
"maximum": 128
},
"maximumHeatwaveLoadingTasks": {
"type": "integer",
"minimum": 1,
"maximum": 10
}
},
"required": [
"maximumPreparedStmtAsyncTasks",
"maximumHeatwaveLoadingTasks"
],
"additionalProperties": false
}
},
"required": ["limits"],
"additionalProperties": false
}', NEW.data) THEN
-- Raise an error if validation fails
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Incorrect value for column `data` not conforming to '
'the JSON schema';
END IF;
IF JSON_EXTRACT(NEW.data, '$.limits.maximumHeatwaveLoadingTasks') >
JSON_EXTRACT(NEW.data, '$.limits.maximumPreparedStmtAsyncTasks') THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'limits.maximumHeatwaveLoadingTasks cannot be larger '
'than limits.maximumPreparedStmtAsyncTasks';
END IF;
END%%
-- -----------------------------------------------------
-- Trigger `mysql_tasks`.`task_impl_BEFORE_INSERT`
-- Populate server_uuid and alias on insert
-- -----------------------------------------------------
DROP TRIGGER IF EXISTS `mysql_tasks`.`task_impl_BEFORE_INSERT`%%
CREATE TRIGGER `mysql_tasks`.`task_impl_BEFORE_INSERT`
BEFORE INSERT ON `mysql_tasks`.`task_impl` FOR EACH ROW
BEGIN
DECLARE day_abbr VARCHAR(3);
DECLARE max_index INT UNSIGNED;
-- Get the abbreviated day of the week
SET day_abbr = DATE_FORMAT(CURDATE(), '%a');
-- Find the next free index for the given user
SELECT
IFNULL(
MAX(
CAST(SUBSTRING_INDEX(alias, '-', -1) AS UNSIGNED)
), 0
) + 1
INTO
max_index
FROM
`mysql_tasks`.`task_impl`
WHERE
`mysql_user` = BINARY NEW.mysql_user
AND (
(NEW.app_user_id IS NOT NULL AND `app_user_id` = BINARY NEW.app_user_id)
OR
(NEW.app_user_id IS NULL AND `app_user_id` IS NULL)
)
AND alias LIKE CONCAT(day_abbr, '-%');
-- Set the alias (if not set)
SET NEW.`alias` = COALESCE(NEW.`alias`, CONCAT(day_abbr, '-', max_index));
-- Set the server uuid (if not set)
SET NEW.`server_uuid` =
COALESCE(NEW.`server_uuid`, UUID_TO_BIN(@@server_uuid, 1));
END%%
-- -----------------------------------------------------
-- Trigger `mysql_tasks`.`task_impl_BEFORE_DELETE`
-- -----------------------------------------------------
DROP TRIGGER IF EXISTS `mysql_tasks`.`task_impl_BEFORE_DELETE`%%
CREATE TRIGGER `mysql_tasks`.`task_impl_BEFORE_DELETE`
BEFORE DELETE ON `mysql_tasks`.`task_impl` FOR EACH ROW
BEGIN
DELETE FROM `mysql_tasks`.`task_log_impl`
WHERE
`task_id` = OLD.`id`;
END%%
-- -----------------------------------------------------
-- Trigger `mysql_tasks`.`task_impl_BEFORE_UPDATE`
-- -----------------------------------------------------
DROP TRIGGER IF EXISTS `mysql_tasks`.`task_impl_BEFORE_UPDATE`%%
CREATE TRIGGER `mysql_tasks`.`task_impl_BEFORE_UPDATE`
BEFORE UPDATE ON `mysql_tasks`.`task_impl` FOR EACH ROW
BEGIN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Updates disabled on this table.';
END%%
-- -----------------------------------------------------
-- Trigger `mysql_tasks`.`task_log_impl_BEFORE_UPDATE`
-- -----------------------------------------------------
DROP TRIGGER IF EXISTS `mysql_tasks`.`task_log_impl_BEFORE_UPDATE`%%
CREATE TRIGGER `mysql_tasks`.`task_log_impl_BEFORE_UPDATE`
BEFORE UPDATE ON `mysql_tasks`.`task_log_impl` FOR EACH ROW
BEGIN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Updates disabled on this table.';
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`extract_username`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`extract_username`%%
CREATE FUNCTION `mysql_tasks`.`extract_username`(
`mysql_user` VARCHAR(288)
)
RETURNS VARCHAR(32)
DETERMINISTIC CONTAINS SQL
SQL SECURITY INVOKER
COMMENT '
Extract username from a string holding MySQL username@hostname
'
BEGIN
RETURN (
LEFT(
mysql_user,
LENGTH(mysql_user) - LOCATE('@', REVERSE(mysql_user))
)
);
END %%
DELIMITER ;
-- -----------------------------------------------------
-- View `mysql_tasks`.`task_i`
-- Useful for inserts. Prevents an invoker from setting
-- a different mysql_user for a record.
-- Note: grant only INSERT on this view to users
-- (not SELECT)
-- for internal use
-- -----------------------------------------------------
DROP VIEW IF EXISTS `mysql_tasks`.`task_i`;
CREATE SQL SECURITY DEFINER VIEW `mysql_tasks`.`task_i` AS
SELECT
`task_impl`.`id` AS `id`,
`task_impl`.`app_user_id` AS `app_user_id`,
`task_impl`.`alias` AS `alias`,
`task_impl`.`name` AS `name`,
`task_impl`.`server_uuid` AS `server_uuid`,
`task_impl`.`connection_id` AS `connection_id`,
`task_impl`.`task_type` AS `task_type`,
`task_impl`.`data` AS `data`,
`task_impl`.`data_json_schema` AS `data_json_schema`,
`task_impl`.`log_data_json_schema` AS `log_data_json_schema`
FROM `mysql_tasks`.`task_impl`
WHERE
mysql_tasks.extract_username(`mysql_user`) =
BINARY mysql_tasks.extract_username(user());
-- -----------------------------------------------------
-- View `mysql_tasks`.`task_log_i`
-- Useful for inserts. Prevents an invoker from setting
-- a different mysql_user for a record.
-- Note: grant only INSERT on this view to users
-- (not SELECT)
-- for internal use
-- -----------------------------------------------------
DROP VIEW IF EXISTS `mysql_tasks`.`task_log_i`;
CREATE SQL SECURITY DEFINER VIEW `mysql_tasks`.`task_log_i` AS
SELECT
`task_log_impl`.`id` AS `id`,
`task_log_impl`.`task_id` AS `task_id`,
`task_log_impl`.`log_time` AS `log_time`,
`task_log_impl`.`message` AS `message`,
`task_log_impl`.`data` AS `data`,
`task_log_impl`.`progress` AS `progress`,
`task_log_impl`.`status` AS `status`
FROM `mysql_tasks`.`task_log_impl`
WHERE
mysql_tasks.extract_username(`mysql_user`) =
BINARY mysql_tasks.extract_username(user());
-- -----------------------------------------------------
-- View `mysql_tasks`.`task_status_impl`
-- for internal use
-- Provides task information from the `task_impl` table alongside the
-- data from the `task_log_impl` table such as the time of the first log entry,
-- information from the latest log entry, as well as completion estimates
-- derived using the time of task start and task progress.
-- In case the task has been started on a MySQL server with a different
-- server_uuid, some information is masked.
-- Note: Regular users do not need privileges to use it.
-- -----------------------------------------------------
DROP VIEW IF EXISTS `mysql_tasks`.`task_status_impl`;
CREATE SQL SECURITY INVOKER VIEW `mysql_tasks`.`task_status_impl` AS
SELECT
t.id AS id,
t.app_user_id AS app_user_id,
t.alias AS alias,
t.server_uuid AS server_uuid,
t.name AS name,
t.mysql_user AS mysql_user,
t.connection_id AS connection_id,
t.task_type AS task_type,
t.data AS data,
last_tl.data AS log_data,
IF (
t.server_uuid = server_info.server_uuid,
last_tl.message,
'Not started on this server.'
) AS message,
IF (
t.server_uuid = server_info.server_uuid,
last_tl.progress,
0
) AS progress,
IF (
t.server_uuid = server_info.server_uuid,
last_tl.status,
'EXTERNAL'
) AS status,
CAST(tl.first_log_time AS DATETIME) AS 'scheduled_time',
IF (
t.server_uuid = server_info.server_uuid,
CAST(tl_running.first_log_time AS DATETIME),
NULL
) AS 'starting_time',
IF (
t.server_uuid = server_info.server_uuid,
IF(
last_tl.progress = 0,
NULL,
CAST(
TIMESTAMPADD(
SECOND,
ROUND(
TIMESTAMPDIFF(
SECOND,
tl_running.first_log_time,
tl.last_log_time
) / last_tl.progress * 100
),
tl_running.first_log_time
) AS DATETIME
)
),
NULL
) AS 'estimated_completion_time',
IF (
t.server_uuid = server_info.server_uuid,
IF(
last_tl.progress = 0,
NULL,
TIMESTAMPDIFF(
SECOND,
tl_running.first_log_time,
tl.last_log_time
) / last_tl.progress * 100.0 - TIMESTAMPDIFF(
SECOND,
tl_running.first_log_time,
tl.last_log_time
)
),
NULL
) AS 'estimated_remaining_time',
IF (
t.server_uuid = server_info.server_uuid,
CONCAT(
REPEAT('#', FLOOR(last_tl.progress / 10)),
REPEAT('_', 10 - FLOOR(last_tl.progress / 10))
),
REPEAT('_', 10)
) AS 'progress_bar'
FROM
`mysql_tasks`.`task_impl` t
INNER JOIN (
SELECT
tl1.task_id,
MIN(tl1.log_time) AS first_log_time,
MAX(tl1.log_time) AS last_log_time,
COUNT(*) AS log_count
FROM
`mysql_tasks`.`task_log_impl` tl1
GROUP BY
tl1.task_id
) tl ON t.id = tl.task_id
LEFT OUTER JOIN `mysql_tasks`.`task_log_impl` last_tl
ON tl.task_id = last_tl.task_id
AND tl.last_log_time = last_tl.log_time
LEFT OUTER JOIN (
SELECT
tl2.task_id,
MIN(tl2.log_time) AS first_log_time
FROM
`mysql_tasks`.`task_log_impl` tl2
WHERE
status = 'RUNNING'
GROUP BY
tl2.task_id
) tl_running ON t.id = tl_running.task_id
JOIN (
SELECT UUID_TO_BIN(VARIABLE_VALUE, 1) AS server_uuid
FROM performance_schema.global_variables
WHERE VARIABLE_NAME = 'server_uuid'
) server_info ON 1 = 1;
DELIMITER %%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`app_task_list`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_list`%%
CREATE FUNCTION `mysql_tasks`.`app_task_list`(
`app_user_id` VARCHAR(255),
`task_type` VARCHAR(80),
`offset` INT UNSIGNED,
`limit` INT UNSIGNED
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a paginated list of application tasks.
Parameters:
- app_user_id: application user id to filter the list on
- task_type: type to filter on, if NULL returns all types
- offset: pagination offset
- limit: pagination limit'
BEGIN
DECLARE tasks JSON DEFAULT NULL;
IF `task_type` IS NULL THEN
SET `task_type` = '%';
END IF;
IF `offset` IS NULL THEN
SET `offset` = 0;
END IF;
IF `limit` IS NULL THEN
SET `limit` = 20;
END IF;
SELECT JSON_ARRAYAGG(
JSON_OBJECT(
'id', BIN_TO_UUID(t.id, 1),
'name', t.name,
'connection_id', t.connection_id,
'task_type', t.task_type,
'data', t.data
)) INTO tasks
FROM (
SELECT t1.id, t1.name, t1.connection_id,
t1.task_type, t1.data
FROM
`mysql_tasks`.`task_impl` t1
WHERE
mysql_tasks.extract_username(t1.`mysql_user`) =
BINARY mysql_tasks.extract_username(user())
AND (`app_user_id` IS NULL OR t1.app_user_id = BINARY `app_user_id`)
AND t1.task_type LIKE `task_type`
ORDER BY t1.id DESC
LIMIT `limit` OFFSET `offset`
) t;
RETURN tasks;
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`task_list`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`task_list`%%
CREATE FUNCTION `mysql_tasks`.`task_list`(
`task_type` VARCHAR(80),
`offset` INT UNSIGNED,
`limit` INT UNSIGNED
)
RETURNS JSON
READS SQL DATA
SQL SECURITY INVOKER
COMMENT '
Returns a paginated list of tasks
Parameters:
- task_type: type to filter on, if NULL returns all types
- offset: pagination offset
- limit: pagination limit'
BEGIN
RETURN (
SELECT `mysql_tasks`.`app_task_list`(NULL, `task_type`, `offset`, `limit`)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`app_task`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task`%%
CREATE FUNCTION `mysql_tasks`.`app_task`(
app_user_id VARCHAR(255),
id_or_alias VARCHAR(36)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns information about a single application task
Parameters:
- app_user_id: application user id to filter the list on
- id_or_alias: task UUID or its unique alias'
BEGIN
DECLARE tasks JSON DEFAULT NULL;
DECLARE task_id VARCHAR(36);
SELECT `mysql_tasks`.`get_task_id`(id_or_alias) INTO task_id;
SELECT JSON_OBJECT(
'id', BIN_TO_UUID(t.id, 1),
'alias', t.alias,
'name', t.name,
'connection_id', t.connection_id,
'task_type', t.task_type,
'data', t.data,
'data_json_schema', t.data_json_schema,
'log_data_json_schema', t.log_data_json_schema
) INTO tasks
FROM
`mysql_tasks`.`task_impl` t
WHERE
mysql_tasks.extract_username(t.`mysql_user`) =
BINARY mysql_tasks.extract_username(user())
AND (`app_user_id` IS NULL OR t.app_user_id = BINARY `app_user_id`)
AND t.id = UUID_TO_BIN(`task_id`, 1);
RETURN tasks;
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`task`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`task`%%
CREATE FUNCTION `mysql_tasks`.`task`(
id_or_alias VARCHAR(36)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns information about a single task
Parameters:
- id_or_alias: task UUID or its unique alias'
BEGIN
RETURN (
SELECT `mysql_tasks`.`app_task`(NULL, id_or_alias)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`app_task_logs`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_logs`%%
CREATE FUNCTION `mysql_tasks`.`app_task_logs`(
app_user_id VARCHAR(255),
id_or_alias VARCHAR(36),
newer_than_log_time TIMESTAMP(6)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a list of logs belonging to an application task
Parameters:
- app_user_id: application user id to filter the list on
- id_or_alias: task UUID or its unique alias
- newer_than_log_time: if not NULL, only return log entries
newer than the specified timestamp'
BEGIN
DECLARE task_logs JSON DEFAULT NULL;
DECLARE task_id VARCHAR(36);
SELECT `mysql_tasks`.`get_task_id`(id_or_alias) INTO task_id;
SELECT JSON_ARRAYAGG(
JSON_OBJECT(
'id', BIN_TO_UUID(tl.id, 1),
'task_id', BIN_TO_UUID(tl.task_id, 1),
'log_time', tl.log_time,
'message', tl.message,
'data', tl.data,
'progress', tl.progress,
'status', tl.status
)) INTO task_logs
FROM
`mysql_tasks`.`task_log_impl` tl
JOIN
`mysql_tasks`.`task_impl` t
ON
tl.task_id = t.id
WHERE
mysql_tasks.extract_username(tl.`mysql_user`) =
BINARY mysql_tasks.extract_username(user())
AND (`app_user_id` IS NULL OR t.app_user_id = BINARY `app_user_id`)
AND tl.task_id = UUID_TO_BIN(`task_id`, 1)
AND IFNULL(tl.log_time > newer_than_log_time, TRUE)
ORDER BY tl.log_time DESC;
RETURN task_logs;
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`task_logs`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`task_logs`%%
CREATE FUNCTION `mysql_tasks`.`task_logs`(
id_or_alias VARCHAR(36),
newer_than_log_time TIMESTAMP(6)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a list of logs belonging to a task
Parameters:
- id_or_alias: task UUID or its unique alias
- newer_than_log_time: if not NULL, only return log entries
newer than the specified timestamp'
BEGIN
RETURN (
SELECT `mysql_tasks`.`app_task_logs`(NULL, id_or_alias, newer_than_log_time)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`app_task_status_list`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_status_list`%%
CREATE FUNCTION `mysql_tasks`.`app_task_status_list`(
`app_user_id` VARCHAR(255),
`task_type` VARCHAR(80),
`offset` INT UNSIGNED,
`limit` INT UNSIGNED
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a paginated list of application task statuses
Parameters:
- app_user_id: application user id to filter the list on
- task_type: type to filter on, if NULL returns all types
- offset: pagination offset
- limit: pagination limit'
BEGIN
IF `task_type` IS NULL THEN
SET `task_type` = '%';
END IF;
IF `offset` IS NULL THEN
SET `offset` = 0;
END IF;
IF `limit` IS NULL THEN
SET `limit` = 20;
END IF;
RETURN (
SELECT JSON_ARRAYAGG(
JSON_OBJECT(
'id', BIN_TO_UUID(sq.id, 1),
'alias', sq.alias,
'server_uuid', BIN_TO_UUID(sq.server_uuid, 1),
'name', sq.name,
'task_type', sq.task_type,
'task_data', sq.data,
'data', sq.log_data,
'message', sq.message,
'progress', sq.progress,
'status', sq.status,
'scheduled_time', sq.scheduled_time,
'starting_time', sq.starting_time,
'estimated_completion_time', sq.estimated_completion_time,
'estimated_remaining_time', sq.estimated_remaining_time,
'progress_bar', sq.progress_bar,
'row_hash', MD5(CONCAT_WS(',',
sq.status, sq.message, sq.progress, sq.data, sq.log_data,
sq.scheduled_time, sq.starting_time, sq.estimated_completion_time,
sq.estimated_remaining_time
)
)
)
)
FROM (
SELECT * FROM
`mysql_tasks`.`task_status_impl` tsi
WHERE
mysql_tasks.extract_username(tsi.mysql_user) =
BINARY mysql_tasks.extract_username(SESSION_USER())
AND (`app_user_id` IS NULL OR tsi.app_user_id = BINARY `app_user_id`)
AND tsi.task_type LIKE `task_type`
ORDER BY tsi.id DESC
LIMIT `limit` OFFSET `offset`
) sq
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`task_status_list`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`task_status_list`%%
CREATE FUNCTION `mysql_tasks`.`task_status_list`(
`task_type` VARCHAR(80),
`offset` INT UNSIGNED,
`limit` INT UNSIGNED
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a paginated list of task statuses
Parameters:
- task_type: type to filter on, if NULL returns all types
- offset: pagination offset
- limit: pagination limit'
BEGIN
RETURN (
SELECT `mysql_tasks`.`app_task_status_list`(
NULL, `task_type`, `offset`, `limit`
)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`app_task_status`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_status`%%
CREATE FUNCTION `mysql_tasks`.`app_task_status`(
app_user_id VARCHAR(255),
id_or_alias VARCHAR(36)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns status of an application task
Parameters:
- app_user_id: application user id to filter the list on
- id_or_alias: task UUID or its unique alias'
BEGIN
DECLARE task_id VARCHAR(36);
SELECT `mysql_tasks`.`get_task_id`(id_or_alias) INTO task_id;
RETURN (
SELECT
JSON_OBJECT(
'id', BIN_TO_UUID(sq.id, 1),
'alias', sq.alias,
'server_uuid', BIN_TO_UUID(sq.server_uuid, 1),
'name', sq.name,
'connection_id', sq.connection_id,
'task_type', sq.task_type,
'task_data', sq.data,
'data', sq.log_data,
'message', sq.message,
'progress', sq.progress,
'status', sq.status,
'scheduled_time', sq.scheduled_time,
'starting_time', sq.starting_time,
'estimated_completion_time', sq.estimated_completion_time,
'estimated_remaining_time', sq.estimated_remaining_time,
'progress_bar', sq.progress_bar,
'row_hash', MD5(CONCAT_WS(',',
sq.status, sq.message, sq.progress, sq.data, sq.log_data,
sq.scheduled_time, sq.starting_time, sq.estimated_completion_time,
sq.estimated_remaining_time
)
)
)
FROM (
SELECT * FROM
`mysql_tasks`.`task_status_impl` tsi
WHERE
mysql_tasks.extract_username(tsi.mysql_user) =
BINARY mysql_tasks.extract_username(SESSION_USER())
AND (`app_user_id` IS NULL OR tsi.app_user_id = BINARY `app_user_id`)
AND tsi.id = UUID_TO_BIN(`task_id`, 1)
) sq
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`task_status`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`task_status`%%
CREATE FUNCTION `mysql_tasks`.`task_status`(
id_or_alias VARCHAR(36)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns task status
Parameters:
- id_or_alias: task UUID or its unique alias'
BEGIN
RETURN (
SELECT `mysql_tasks`.`app_task_status`(NULL, id_or_alias)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`app_task_status_brief`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_status_brief`%%
CREATE FUNCTION `mysql_tasks`.`app_task_status_brief`(
app_user_id VARCHAR(255),
id_or_alias VARCHAR(36)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a brief status of an application task
Parameters:
- app_user_id: application user id to filter the list on
- id_or_alias: task UUID or its unique alias'
BEGIN
DECLARE task_id VARCHAR(36);
SELECT `mysql_tasks`.`get_task_id`(id_or_alias) INTO task_id;
RETURN (
SELECT
JSON_OBJECT(
'data', sq.log_data,
'message', sq.message,
'progress', sq.progress,
'status', sq.status
)
FROM (
SELECT * FROM
`mysql_tasks`.`task_status_impl` tsi
WHERE
mysql_tasks.extract_username(tsi.mysql_user) =
BINARY mysql_tasks.extract_username(SESSION_USER())
AND (`app_user_id` IS NULL OR tsi.app_user_id = BINARY `app_user_id`)
AND tsi.id = UUID_TO_BIN(`task_id`, 1)
LIMIT 1
) sq
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`task_status_brief`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`task_status_brief`%%
CREATE FUNCTION `mysql_tasks`.`task_status_brief`(
id_or_alias VARCHAR(36)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a brief task status
Parameters:
- id_or_alias: task UUID or its unique alias
'
BEGIN
RETURN (
SELECT `mysql_tasks`.`app_task_status_brief`(NULL, id_or_alias)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`find_task_log_msg`
-- for internal use
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`find_task_log_msg`%%
CREATE FUNCTION `mysql_tasks`.`find_task_log_msg`(
task_id VARCHAR(36),
log_msg TEXT
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
BEGIN
DECLARE task_log JSON DEFAULT NULL;
SELECT JSON_OBJECT(
'id', BIN_TO_UUID(tl.id, 1),
'log_time', tl.log_time,
'data', tl.data,
'progress', tl.progress,
'status', tl.status
) INTO task_log
FROM `mysql_tasks`.`task_log_impl` tl
WHERE
tl.task_id = UUID_TO_BIN(`task_id`, 1)
AND tl.message = log_msg;
RETURN task_log;
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`get_task_log_data_json_schema`
-- for internal use
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_log_data_json_schema`%%
CREATE FUNCTION `mysql_tasks`.`get_task_log_data_json_schema`(
task_id VARCHAR(36)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
BEGIN
IF @mysql_tasks_initiated IS NULL THEN
SIGNAL SQLSTATE 'HY000' SET
MESSAGE_TEXT = 'get_task_log_data_json_schema function should not be '
'called directly';
END IF;
RETURN (
SELECT
t.log_data_json_schema
FROM `mysql_tasks`.`task_impl` t
WHERE
t.id = UUID_TO_BIN(`task_id`, 1)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`get_task_connection_id`
-- for internal use
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_connection_id`%%
CREATE FUNCTION `mysql_tasks`.`get_task_connection_id`(
task_id VARCHAR(36)
)
RETURNS BIGINT UNSIGNED
READS SQL DATA
SQL SECURITY DEFINER
BEGIN
IF @mysql_tasks_initiated IS NULL THEN
SIGNAL SQLSTATE 'HY000' SET
MESSAGE_TEXT = 'get_task_connection_id function should not be '
'called directly';
END IF;
RETURN (
SELECT
t.connection_id
FROM `mysql_tasks`.`task_impl` t
WHERE
t.id = UUID_TO_BIN(`task_id`, 1)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`active_task_count`
-- for internal use
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`active_task_count`%%
CREATE FUNCTION `mysql_tasks`.`active_task_count`(task_type VARCHAR(80))
RETURNS INT UNSIGNED
READS SQL DATA
SQL SECURITY DEFINER
BEGIN
IF @mysql_tasks_initiated IS NULL THEN
SIGNAL SQLSTATE 'HY000' SET
MESSAGE_TEXT = 'active_task_count function should not be called directly';
END IF;
RETURN (
SELECT COUNT(*) FROM (
SELECT t.id
FROM `mysql_tasks`.`task_impl` t
JOIN `mysql_tasks`.`task_log_impl` tl ON t.id = tl.task_id
WHERE t.server_uuid = UUID_TO_BIN(@@server_uuid, 1)
AND (t.task_type = `task_type` OR `task_type` IS NULL)
GROUP BY t.id
HAVING SUM(tl.status IN ('COMPLETED', 'ERROR', 'CANCELLED')) = 0
AND SUM(tl.status IN ('RUNNING', 'SCHEDULED')) > 0
) active_tasks
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`get_app_task_ids_from_alias`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`get_app_task_ids_from_alias`%%
CREATE FUNCTION `mysql_tasks`.`get_app_task_ids_from_alias`(
app_user_id VARCHAR(255),
alias VARCHAR(16)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a list of the task UUIDs with a given alias,
belonging to a given application user.
Parameters:
- app_user_id: application user id to filter the list on
- alias: task alias'
BEGIN
RETURN (
SELECT JSON_ARRAYAGG(
BIN_TO_UUID(t.id, 1)
)
FROM
`mysql_tasks`.`task_impl` t
WHERE
mysql_tasks.extract_username(t.mysql_user) =
BINARY mysql_tasks.extract_username(SESSION_USER())
AND (`app_user_id` IS NULL OR t.app_user_id = BINARY `app_user_id`)
AND t.alias = alias
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`get_task_ids_from_alias`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_ids_from_alias`%%
CREATE FUNCTION `mysql_tasks`.`get_task_ids_from_alias`(
alias VARCHAR(16)
)
RETURNS JSON
READS SQL DATA
SQL SECURITY DEFINER
COMMENT '
Returns a list of the task UUIDs with a given alias.
Parameters:
- alias: task alias'
BEGIN
RETURN (
SELECT `mysql_tasks`.`get_app_task_ids_from_alias`(NULL, alias)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`get_app_task_id`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`get_app_task_id`%%
CREATE FUNCTION `mysql_tasks`.`get_app_task_id`(
app_user_id VARCHAR(255),
id_or_alias VARCHAR(36)
)
RETURNS VARCHAR(36)
READS SQL DATA
SQL SECURITY INVOKER
COMMENT '
Returns an application task UUID.
Parameters:
- app_user_id: application user id to filter the list on
- id_or_alias: task UUID or its unique alias.
if UUID, returns it.
if alias, looks up its UUID.'
BEGIN
DECLARE task_ids JSON;
DECLARE task_id VARCHAR(36);
IF (CHAR_LENGTH(id_or_alias) = 36) THEN
-- If argument has the exact length of UUID, assume task_id is provided
SELECT id_or_alias INTO task_id;
ELSE
-- Otherwise, assume alias is provided
IF (CHAR_LENGTH(id_or_alias) > 16) THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Invalid id_or_alias parameter';
END IF;
-- Replace task_alias with task_id
SELECT `mysql_tasks`.`get_app_task_ids_from_alias`(
app_user_id,
id_or_alias
) INTO task_ids;
IF JSON_LENGTH(task_ids) > 1 THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Multiple tasks for this alias, re-run using a'
' task_id. For a list IDs: SELECT'
' mysql_tasks.get_task_ids_from_alias(alias);';
END IF;
SELECT task_ids->>'$[0]' INTO task_id;
END IF;
RETURN task_id;
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`get_task_id`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_id`%%
CREATE FUNCTION `mysql_tasks`.`get_task_id`(
id_or_alias VARCHAR(36)
)
RETURNS VARCHAR(36)
READS SQL DATA
SQL SECURITY INVOKER
COMMENT '
Returns a task UUID.
Parameters:
- id_or_alias: task UUID or its unique alias.
if UUID, returns it.
if alias, looks up its UUID.'
BEGIN
RETURN (
SELECT `mysql_tasks`.`get_app_task_id`(NULL, id_or_alias)
);
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`get_app_task_alias`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`get_app_task_alias`%%
CREATE FUNCTION `mysql_tasks`.`get_app_task_alias`(
app_user_id VARCHAR(255),
id VARCHAR(36)
)
RETURNS VARCHAR(16)
READS SQL DATA
SQL SECURITY INVOKER
COMMENT '
Returns an application task alias given its UUID.
Parameters:
- app_user_id: application user id to filter the list on
- id: task UUID'
BEGIN
DECLARE task_info JSON;
SELECT `mysql_tasks`.`app_task`(app_user_id, id) INTO task_info;
RETURN task_info->>'$.alias';
END%%
-- -----------------------------------------------------
-- Function `mysql_tasks`.`get_task_alias`
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_alias`%%
CREATE FUNCTION `mysql_tasks`.`get_task_alias`(
id VARCHAR(36)
)
RETURNS VARCHAR(16)
READS SQL DATA
SQL SECURITY INVOKER
COMMENT '
Returns a task alias given its UUID.
Parameters:
- id: task UUID'
BEGIN
RETURN (
SELECT `mysql_tasks`.`get_app_task_alias`(NULL, id)
);
END%%
-- -----------------------------------------------------
-- FUNCTION `mysql_tasks`.`quote_identifier`
-- for internal use
-- (copy of sys.quote_identifier to avoid permission issues)
-- -----------------------------------------------------
DROP FUNCTION IF EXISTS `mysql_tasks`.`quote_identifier`%%
CREATE FUNCTION `mysql_tasks`.`quote_identifier`(in_identifier TEXT)
RETURNS TEXT CHARSET UTF8MB4
SQL SECURITY INVOKER
DETERMINISTIC
NO SQL
BEGIN
IF @mysql_tasks_initiated IS NULL THEN
SIGNAL SQLSTATE 'HY000' SET
MESSAGE_TEXT = 'quote_identifier function should not be called directly';
END IF;
RETURN CONCAT('`', REPLACE(in_identifier, '`', '``'), '`');
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`create_app_task`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`create_app_task`%%
CREATE PROCEDURE `mysql_tasks`.`create_app_task`(
IN `app_user_id` VARCHAR(255),
IN `name` VARCHAR(255), IN `task_type` VARCHAR(45), IN `data` JSON,
IN `data_json_schema` JSON, IN `log_data_json_schema` JSON,
OUT `task_id` VARCHAR(36))
SQL SECURITY INVOKER
COMMENT '
Creates an application task and returns its UUID.
Parameters:
- app_user_id: application user id to filter the list on
- name: name for the task
- task_type: type for the task
- data: JSON field holding additional task data
- data_json_schema: JSON schema for the data field
- log_data_json_schema: JSON schema for task log data filed
- OUT task_id: UUID of the created task'
BEGIN
SET task_id = UUID();
CALL `mysql_tasks`.`create_app_task_with_id`(
`app_user_id`,
`task_id`,
`name`,
`task_type`,
`data`,
`data_json_schema`,
`log_data_json_schema`
);
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`create_app_task_with_id`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`create_app_task_with_id`%%
CREATE PROCEDURE `mysql_tasks`.`create_app_task_with_id`(
IN `app_user_id` VARCHAR(255),
IN `task_id` VARCHAR(36), IN `name` VARCHAR(255), IN `task_type` VARCHAR(45),
IN `data` JSON, IN `data_json_schema` JSON, IN `log_data_json_schema` JSON)
SQL SECURITY INVOKER
COMMENT '
Creates an application task given its UUID.
Parameters:
- app_user_id: application user id to filter the list on
- task_id: task UUID
- name: name for the task
- task_type: type for the task
- data: JSON field holding additional task data
- data_json_schema: JSON schema for the data field
- log_data_json_schema: JSON schema for task log data filed'
BEGIN
-- insert entry into task table
INSERT INTO `mysql_tasks`.`task_i`(`id`, `app_user_id`, `server_uuid`,
`name`, `connection_id`, `task_type`, `data`,
`data_json_schema`, `log_data_json_schema`)
VALUES (UUID_TO_BIN(task_id, 1), app_user_id, UUID_TO_BIN(@@server_uuid, 1),
`name`, CONNECTION_ID(), `task_type`, `data`,
`data_json_schema`, `log_data_json_schema`);
IF `data_json_schema` IS NOT NULL AND
NOT JSON_SCHEMA_VALID(`data_json_schema`, `data`) THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'The provided task data does not conform to the given'
' data_json_schema.',
MYSQL_ERRNO = 1108;
END IF;
CALL `mysql_tasks`.`add_task_log`(
task_id,
'Task created by user.',
NULL,
0,
'SCHEDULED'
);
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`create_task`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`create_task`%%
CREATE PROCEDURE `mysql_tasks`.`create_task`(
IN `name` VARCHAR(255),
IN `task_type` VARCHAR(45),
IN `data` JSON,
OUT `task_id` VARCHAR(36)
) SQL SECURITY INVOKER
COMMENT '
Creates a task and returns its UUID.
Parameters:
- name: name for the task
- task_type: type for the task
- data: JSON field holding additional task data
- OUT task_id: UUID of the created task'
BEGIN
CALL `mysql_tasks`.`create_app_task`(
NULL, `name`, task_type, `data`, NULL, NULL, `task_id`);
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`create_task_with_id`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`create_task_with_id`%%
CREATE PROCEDURE `mysql_tasks`.`create_task_with_id`(
IN `task_id` VARCHAR(36),
IN `name` VARCHAR(255),
IN `task_type` VARCHAR(45),
IN `data` JSON
) SQL SECURITY INVOKER
COMMENT '
Creates a task given its UUID.
Parameters:
- task_id: task UUID
- name: name for the task
- task_type: type for the task
- data: JSON field holding additional task data'
BEGIN
CALL `mysql_tasks`.`create_app_task_with_id`(
NULL, `task_id`, `name`, `task_type`, `data`, NULL, NULL);
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`add_task_log`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`add_task_log`%%
CREATE PROCEDURE `mysql_tasks`.`add_task_log`(
IN `task_id` VARCHAR(36),
IN `message` VARCHAR(2000),
IN `data` JSON,
IN `progress` SMALLINT,
IN `status` ENUM('SCHEDULED', 'RUNNING', 'COMPLETED', 'ERROR', 'CANCELLED')
) SQL SECURITY INVOKER
COMMENT '
Persists a task log entry.
Parameters:
- task_id: UUID of the task owning the log entry
- message: a log message
- data: JSON field holding additional log data
- progress: task execution progress (0 - 100)
- status: enumeration for the task status'
BEGIN
DECLARE log_id BINARY(16) DEFAULT UUID_TO_BIN(UUID(), 1);
DECLARE log_data_json_schema JSON DEFAULT NULL;
DECLARE initiate_name TEXT DEFAULT 'add_task_log';
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
IF @mysql_tasks_initiated <=> initiate_name THEN
SET @mysql_tasks_initiated = NULL;
END IF;
RESIGNAL;
END;
IF @mysql_tasks_initiated IS NULL THEN
SET @mysql_tasks_initiated = initiate_name;
END IF;
SELECT `mysql_tasks`.`get_task_log_data_json_schema`(task_id)
INTO log_data_json_schema;
IF `log_data_json_schema` IS NOT NULL
AND NOT JSON_SCHEMA_VALID(`log_data_json_schema`, `data`) THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'The provided log data does not conform to the'
' given log_data_json_schema.',
MYSQL_ERRNO = 1108;
END IF;
INSERT INTO `mysql_tasks`.`task_log_i` (
`id`, `task_id`, `log_time`, `message`, `data`, `progress`, `status`)
VALUES (
log_id, UUID_TO_BIN(task_id, 1), NOW(6), message, `data`, progress, `status`
);
IF @mysql_tasks_initiated <=> initiate_name THEN
SET @mysql_tasks_initiated = NULL;
END IF;
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`kill_app_task`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`kill_app_task`%%
CREATE PROCEDURE `mysql_tasks`.`kill_app_task`(
IN `app_user_id` VARCHAR(255),
IN `id_or_alias` VARCHAR(36)
) SQL SECURITY INVOKER
COMMENT '
Kills an application task.
Parameters:
- app_user_id: application user id to filter the list on
- id_or_alias: UUID or a unique alias of the task to kill.'
BEGIN
DECLARE task_id VARCHAR(36);
DECLARE task_status JSON DEFAULT NULL;
DECLARE status TEXT DEFAULT NULL;
DECLARE suuid VARCHAR(36) DEFAULT NULL;
DECLARE cid BIGINT UNSIGNED DEFAULT NULL;
DECLARE i INT DEFAULT 0;
DECLARE event_name TEXT DEFAULT NULL;
DECLARE initiate_name TEXT DEFAULT 'kill_app_task';
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
IF @mysql_tasks_initiated <=> initiate_name THEN
SET @mysql_tasks_initiated = NULL;
END IF;
RESIGNAL;
END;
IF @mysql_tasks_initiated IS NULL THEN
SET @mysql_tasks_initiated = initiate_name;
END IF;
SELECT `mysql_tasks`.`get_app_task_id`(app_user_id, id_or_alias) INTO task_id;
SELECT `mysql_tasks`.`app_task_status`(app_user_id, task_id) INTO task_status;
IF task_status IS NULL THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Task does not exist or not allowed to access it.';
END IF;
SELECT
task_status->>'$.status',
task_status->>'$.server_uuid',
task_status->>'$.connection_id'
INTO status, suuid, cid;
IF suuid <> @@server_uuid THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Tasks started on other servers cannot be killed.';
END IF;
IF (status <> 'RUNNING' AND status <> 'SCHEDULED') THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Task inactive.';
END IF;
CALL `mysql_tasks`.`add_task_log`(
task_id, 'Cancelled by user.', NULL, 100, 'CANCELLED');
-- kill task monitor
IF JSON_LENGTH(task_status->'$.task_data.mysqlMetadata.events') > 1 THEN
CALL `mysql_tasks`.`stop_task_monitor`(
task_status->'$.task_data.mysqlMetadata.events[last]', task_id);
END IF;
-- kill process
BEGIN
-- continue on ER_NO_SUCH_THREAD
DECLARE CONTINUE HANDLER FOR 1094 BEGIN END;
SET @stmt = CONCAT('KILL ', cid);
PREPARE stmt FROM @stmt; EXECUTE stmt; DEALLOCATE PREPARE stmt;
END;
-- drop associated events
IF JSON_CONTAINS_PATH(
task_status, 'one', '$.task_data.mysqlMetadata.events') THEN
WHILE i < JSON_LENGTH(task_status->'$.task_data.mysqlMetadata.events') DO
SET event_name = JSON_UNQUOTE(
JSON_EXTRACT(
task_status->'$.task_data.mysqlMetadata.events', CONCAT('$[',i,']')
));
CALL `mysql_tasks`.`drop_event`(event_name);
SELECT i+1 INTO i;
END WHILE;
END IF;
IF @mysql_tasks_initiated <=> initiate_name THEN
SET @mysql_tasks_initiated = NULL;
END IF;
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`kill_task`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`kill_task`%%
CREATE PROCEDURE `mysql_tasks`.`kill_task`(IN `id_or_alias` VARCHAR(36))
SQL SECURITY INVOKER
COMMENT '
Kills a task.
Parameters:
- id_or_alias: UUID or a unique alias of the task to kill.'
BEGIN
CALL `mysql_tasks`.`kill_app_task`(NULL, id_or_alias);
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`drop_event`
-- for internal use
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`drop_event`%%
CREATE PROCEDURE `mysql_tasks`.`drop_event`(IN `event_name` TEXT)
SQL SECURITY INVOKER
COMMENT '
Drops an event owned by a task.
Parameters:
- event_name: the name of the event to be dropped.'
BEGIN
DECLARE event_cmnt TEXT DEFAULT NULL;
DECLARE version_string VARCHAR(255);
DECLARE major INT UNSIGNED;
DECLARE minor INT UNSIGNED;
DECLARE patch INT UNSIGNED;
DECLARE task_mgmt_major INT UNSIGNED;
DECLARE task_mgmt_minor INT UNSIGNED;
DECLARE task_mgmt_patch INT UNSIGNED;
IF @mysql_tasks_initiated IS NULL THEN
SIGNAL SQLSTATE 'HY000' SET
MESSAGE_TEXT = 'drop_event procedure should not be called directly';
END IF;
-- get schema version
SELECT
v.major, v.minor, v.patch
INTO
task_mgmt_major, task_mgmt_minor, task_mgmt_patch
FROM
`mysql_tasks`.`msm_schema_version` v;
-- get comment from the runner event
SELECT
EVENT_COMMENT INTO event_cmnt
FROM
information_schema.events e
WHERE
CONCAT(mysql_tasks.quote_identifier(e.EVENT_SCHEMA),
'.', mysql_tasks.quote_identifier(e.EVENT_NAME)) = `event_name`
AND e.EVENT_COMMENT LIKE 'mysql_tasks_schema_version=%';
SET version_string = SUBSTRING_INDEX(
event_cmnt, 'mysql_tasks_schema_version=', -1); -- Get the part after '='
SET major = CAST(SUBSTRING_INDEX(version_string, '.', 1) AS UNSIGNED);
SET minor = CAST(
SUBSTRING_INDEX(
SUBSTRING_INDEX(version_string, '.', 2), '.', -1
) AS UNSIGNED
);
SET patch = CAST(SUBSTRING_INDEX(version_string, '.', -1) AS UNSIGNED);
-- drop event if it has the comment with the supported version
IF (event_cmnt IS NOT NULL)
AND (major < task_mgmt_major OR (major = task_mgmt_major
AND (minor < task_mgmt_minor OR (minor = task_mgmt_minor
AND patch <= task_mgmt_patch))))
THEN
SET @stmt = CONCAT('DROP EVENT IF EXISTS ', event_name);
PREPARE stmt FROM @stmt;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END IF;
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`execute_prepared_stmt_from_app_async`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`execute_prepared_stmt_from_app_async`%%
CREATE PROCEDURE `mysql_tasks`.`execute_prepared_stmt_from_app_async`(
IN `sql_statements` TEXT,
IN `app_user_id` VARCHAR(255),
IN `schema_name` VARCHAR(255),
IN `task_type` VARCHAR(80),
IN `task_name` VARCHAR(255),
IN `task_data` JSON,
IN `data_json_schema` JSON,
IN `log_data_json_schema` JSON,
IN `progress_monitor_sql_statements` TEXT,
IN `progress_monitor_refresh_period` DECIMAL(5,2),
OUT task_id VARCHAR(36))
SQL SECURITY INVOKER
COMMENT '
Executes a prepared statement asynchronously from an application.
Parameters:
- sql_statements: one or multiple SQL statements (separated by ;)
that are to be executed in a detached thread
- app_user_id: application user id to filter the list on
- schema_name: name of the schema that hosts MySQL Events running
in dedicated threads
- task_type: type of the task
- task_name: name of the task
- task_data: additional data/metadata of a task
- data_json_schema: JSON schema of the task_data field
- log_data_json_schema: JSON schema for the data field for the task logs
- progress_monitor_sql_statements: one or multiple SQL statements
(separated by ;) that are executed periodically and that can update
the progress of the background task
- progress_monitor_refresh_period: interval at which progress monitor sql
statements are executed (in seconds)
- OUT task_id: UUID of the task that runs the prepared statement'
BEGIN
DECLARE event_name, progress_event_name TEXT DEFAULT NULL;
DECLARE task_mgmt_version TEXT DEFAULT NULL;
DECLARE max_parallel_tasks INT UNSIGNED DEFAULT NULL;
DECLARE active_task_cnt INT UNSIGNED DEFAULT NULL;
DECLARE internal_data JSON DEFAULT NULL;
DECLARE internal_data_json_schema JSON DEFAULT NULL;
DECLARE data_json_schema_required JSON DEFAULT NULL;
DECLARE initiate_name TEXT DEFAULT 'execute_prepared_stmt_from_app_async';
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
GET DIAGNOSTICS CONDITION 1
@p1 = RETURNED_SQLSTATE,
@p2 = MYSQL_ERRNO,
@p3 = MESSAGE_TEXT;
DO RELEASE_LOCK('execute_prepared_stmt_async');
IF @mysql_tasks_initiated <=> initiate_name THEN
SET @mysql_tasks_initiated = NULL;
END IF;
IF @p3 = 'No schema set.' THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'No schema set. Please pass in the schema name'
' or set the current schema with USE.',
MYSQL_ERRNO = 1108;
ELSE
RESIGNAL;
END IF;
END;
-- user() and current_user() are different when invoked through an event
IF (SELECT COUNT(*)>0 FROM `performance_schema`.`events_statements_current`
WHERE thread_id=PS_CURRENT_THREAD_ID() AND
event_name='statement/scheduler/event') THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Invoking the procedure from a MySQL EVENT is not '
'supported';
END IF;
IF mysql_tasks.extract_username(SESSION_USER()) !=
mysql_tasks.extract_username(CURRENT_USER()) THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Procedure not supported when USER() and '
'CURRENT_USER() are different';
END IF;
IF @mysql_tasks_initiated IS NULL THEN
SET @mysql_tasks_initiated = initiate_name;
END IF;
SELECT
JSON_EXTRACT(data, '$.limits.maximumPreparedStmtAsyncTasks')
INTO
max_parallel_tasks
FROM
`mysql_tasks`.`config`
WHERE id = 1
LIMIT 1;
IF progress_monitor_refresh_period IS NULL THEN
SET progress_monitor_refresh_period = 5;
ELSEIF progress_monitor_refresh_period <= 0 THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'progress_monitor_refresh_period must be a '
'positive number',
MYSQL_ERRNO = 1108;
END IF;
IF task_type IS NULL THEN
SET task_type = 'Async_SQL';
END IF;
IF task_name IS NULL THEN
SET task_name = 'execute_prepared_stmt_async';
END IF;
IF schema_name IS NULL THEN
SELECT
current_schema INTO schema_name
FROM
`performance_schema`.`events_statements_current`
WHERE
thread_id=PS_CURRENT_THREAD_ID()
AND nesting_event_level=0;
END IF;
IF schema_name IS NULL THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No schema set.',
MYSQL_ERRNO = 1108;
END IF;
-- ensure the SQL statements end with a semicolon
IF NOT REGEXP_LIKE(sql_statements, ';[:space:]*$') THEN
SET sql_statements = CONCAT(sql_statements, '; ');
END IF;
-- make sure the reserved property is not used
IF JSON_CONTAINS_PATH(
data_json_schema, 'one', '$.properties.mysqlMetadata') THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'data_json_schema must not'
' contain a reserved property ''mysqlMetadata''.', MYSQL_ERRNO = 1108;
END IF;
SET internal_data_json_schema = JSON_OBJECT(
'type', 'object',
'properties', JSON_OBJECT(
'mysqlMetadata', JSON_OBJECT(
'type', 'object',
'properties', JSON_OBJECT(
'events', JSON_OBJECT(
'type', 'array',
'items', JSON_OBJECT(
'type', 'string'
),
'minItems', 1,
'uniqueItems', true
),
'autoGc', JSON_OBJECT(
'type', 'boolean'
)
),
'required', JSON_ARRAY('events', 'autoGc')
)
),
'required', JSON_ARRAY('mysqlMetadata')
);
-- merge the provided schema and the internal schema
SET data_json_schema = COALESCE(
data_json_schema, JSON_OBJECT('required', JSON_ARRAY()));
SELECT JSON_UNQUOTE(JSON_MERGE_PRESERVE(
JSON_EXTRACT(internal_data_json_schema, '$.required'),
JSON_EXTRACT(data_json_schema, '$.required')
)) INTO data_json_schema_required;
SET data_json_schema = JSON_MERGE_PATCH(
internal_data_json_schema, data_json_schema);
SET data_json_schema = JSON_SET(
data_json_schema, '$.required', data_json_schema_required);
SELECT CONCAT(
mysql_tasks.quote_identifier(schema_name),
'.',
mysql_tasks.quote_identifier(UUID())
) INTO event_name;
SELECT CONCAT(
mysql_tasks.quote_identifier(schema_name),
'.',
mysql_tasks.quote_identifier(UUID())
) INTO progress_event_name;
SELECT CONCAT(major, '.', minor, '.', patch)
FROM `mysql_tasks`.`msm_schema_version` INTO task_mgmt_version;
SET internal_data = JSON_OBJECT(
'mysqlMetadata', JSON_OBJECT(
'events', IF (progress_monitor_sql_statements IS NULL,
JSON_ARRAY(event_name),
JSON_ARRAY(event_name, progress_event_name)
),
'autoGc', true
)
);
-- make sure reserved property is not used
IF JSON_CONTAINS_PATH(task_data, 'one', '$.mysqlMetadata') THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'task_data must not contain a'
' reserved property ''mysqlMetadata''.', MYSQL_ERRNO = 1108;
END IF;
-- merge the provided task data and the internal metadata
SET task_data = COALESCE(task_data, JSON_OBJECT());
SET task_data = JSON_MERGE_PATCH(internal_data, task_data);
IF NOT GET_LOCK('execute_prepared_stmt_async', 5) <=> 1 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Cannot acquire lock.'
' Try again later', MYSQL_ERRNO = 1205;
END IF;
-- READ COMMITTED does not acquire lock on the task_log table,
-- avoiding a potential deadlock between this SELECT and
-- INSERTS in concurrent events
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT `mysql_tasks`.`active_task_count`(NULL) INTO active_task_cnt;
COMMIT ;
IF active_task_cnt >= max_parallel_tasks THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Maximum number of parallel'
' tasks reached, try again later.', MYSQL_ERRNO = 1203;
END IF;
SET task_id = UUID();
SET @eventSql = CONCAT(
'CREATE EVENT ', event_name, ' ',
'ON SCHEDULE AT NOW() ON COMPLETION NOT PRESERVE ENABLE ',
'COMMENT ''mysql_tasks_schema_version=', task_mgmt_version, ''' ',
'DO BEGIN ',
'DECLARE initiate_name TEXT DEFAULT ''task_runner_event''; ',
'DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN ',
' GET DIAGNOSTICS CONDITION 1',
' @p1 = RETURNED_SQLSTATE,',
' @p2 = MYSQL_ERRNO,',
' @p3 = MESSAGE_TEXT; ',
' CALL `mysql_tasks`.`stop_task_monitor`(',
QUOTE(progress_event_name), ', ', QUOTE(task_id), '); ',
' CALL `mysql_tasks`.`add_task_log`(',
QUOTE(task_id), ', CONCAT(''Error: '', @p3), NULL, 100, ''ERROR''); ',
' IF @mysql_tasks_initiated <=> initiate_name THEN ',
' SET @mysql_tasks_initiated = NULL; ',
' END IF; ',
'END; ',
'SET ROLE ALL; ',
'IF @mysql_tasks_initiated IS NULL THEN ',
' SET @mysql_tasks_initiated = initiate_name; ',
'END IF; ',
'SET @task_id =', QUOTE(task_id), '; SET @task_result = NULL; ',
'CALL `mysql_tasks`.`create_app_task_with_id`(',
IF(app_user_id IS NULL, 'NULL', QUOTE(app_user_id)),
', @task_id, ',
QUOTE(task_name), ', ',
QUOTE(task_type), ', ',
QUOTE(task_data), ', ',
QUOTE(data_json_schema), ', ',
QUOTE(log_data_json_schema),
'); ',
'CALL `mysql_tasks`.`start_task_monitor`(',
QUOTE(progress_event_name), ', ',
QUOTE(task_id), ', ',
QUOTE(progress_monitor_sql_statements), ', ',
progress_monitor_refresh_period,
'); '
'CALL `mysql_tasks`.`add_task_log`(',
'@task_id, ''Event execution started...'', NULL, 0, ''RUNNING''); ',
sql_statements,
'CALL `mysql_tasks`.`stop_task_monitor`(',
QUOTE(progress_event_name), ', ', QUOTE(task_id), '); ',
'CALL `mysql_tasks`.`add_task_log`(',
'@task_id, ''Execution finished.'', ',
'CAST(@task_result AS JSON), 100, ''COMPLETED''); ',
'SET @task_id = NULL; SET @task_result = NULL; ',
'IF @mysql_tasks_initiated <=> initiate_name THEN ',
' SET @mysql_tasks_initiated = NULL; ',
'END IF; ',
'END;');
PREPARE dynamic_statement FROM @eventSql;
EXECUTE dynamic_statement;
DEALLOCATE PREPARE dynamic_statement;
-- Wait for the task to be created
WHILE (mysql_tasks.task(task_id) IS NULL) DO
DO SLEEP(0.1);
END WHILE;
-- Release lock after launching the async task
-- The while loop ensures previously acquired locks by the same session,
-- which were not released due to an abort, are released as well.
WHILE IS_USED_LOCK('execute_prepared_stmt_async') <=> CONNECTION_ID() DO
DO RELEASE_LOCK('execute_prepared_stmt_async');
END WHILE;
IF @mysql_tasks_initiated <=> initiate_name THEN
SET @mysql_tasks_initiated = NULL;
END IF;
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`execute_prepared_stmt_async`
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`execute_prepared_stmt_async`%%
CREATE PROCEDURE `mysql_tasks`.`execute_prepared_stmt_async`(
IN `sql_statements` TEXT,
IN `schema_name` VARCHAR(255),
IN `task_name` VARCHAR(255),
IN `task_data` JSON,
OUT task_id VARCHAR(36))
SQL SECURITY INVOKER
COMMENT '
Executes a prepared statement asynchronously.
Parameters:
- sql_statements: one or multiple SQL statements (separated by ;)
that are to be executed in a detached thread
- schema_name: name of the schema that hosts MySQL Events running in
dedicated threads
- task_name: name of the task
- task_data: additional data/metadata of a task
- OUT task_id: UUID of the task that runs the prepared statement'
BEGIN
CALL `mysql_tasks`.`execute_prepared_stmt_from_app_async`(
`sql_statements`, NULL, `schema_name`, NULL, `task_name`,
`task_data`, NULL, NULL, NULL, NULL, task_id);
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`start_task_monitor`
-- for internal use
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`start_task_monitor`%%
CREATE PROCEDURE `mysql_tasks`.`start_task_monitor`(
IN `event_name` TEXT,
IN `task_id` VARCHAR(36),
IN `sql_statements` TEXT,
IN `refresh_period` DECIMAL(5,2)
)
SQL SECURITY INVOKER
COMMENT '
Starts a task monitor SQL Event. It periodically updates the task progress.
Parameters:
- event_name: fully qualified name of the SQL Event that runs the
monitoring SQL statements
- task_id: UUID of the task being monitored
- sql_statements: the monitoring SQL statements (separated by ;)
- refresh_period: interval at which the monitoring sql statements
are executed (in seconds)'
BEGIN
DECLARE task_mgmt_version TEXT DEFAULT NULL;
IF @mysql_tasks_initiated IS NULL THEN
SIGNAL SQLSTATE 'HY000' SET
MESSAGE_TEXT = 'start_task_monitor procedure should not be called directly';
END IF;
IF sql_statements IS NOT NULL AND event_name IS NOT NULL THEN
IF refresh_period IS NULL THEN
SET refresh_period = 5;
ELSEIF refresh_period <= 0 THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'refresh_period must be a positive number',
MYSQL_ERRNO = 1108;
END IF;
SELECT
CONCAT(major, '.', minor, '.', patch)
FROM
`mysql_tasks`.`msm_schema_version` INTO task_mgmt_version;
-- ensure the SQL statements end with a semicolon
IF NOT REGEXP_LIKE(sql_statements, ';[:space:]*$') THEN
SET sql_statements = CONCAT(sql_statements, '; ');
END IF;
SET @eventSql = CONCAT(
'CREATE EVENT ', event_name, ' ',
'ON SCHEDULE AT NOW() ON COMPLETION NOT PRESERVE ENABLE ',
'COMMENT ''mysql_tasks_schema_version=', task_mgmt_version, ''' ',
'DO BEGIN ',
'DECLARE do_run BOOLEAN DEFAULT TRUE; ',
'DECLARE initiate_name TEXT DEFAULT ''task_monitor_event''; ',
'DECLARE lock_name VARCHAR(64) DEFAULT ', QUOTE(RIGHT(event_name,64)),';',
'DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN ',
' GET DIAGNOSTICS CONDITION 1 ',
' @p1 = RETURNED_SQLSTATE, @p2 = MYSQL_ERRNO, @p3 = MESSAGE_TEXT; ',
' CALL `mysql_tasks`.`add_task_log`(',
QUOTE(task_id),', CONCAT(''Error: '', @p3), NULL, 100, ''ERROR''); ',
' IF @mysql_tasks_initiated <=> initiate_name THEN ',
' SET @mysql_tasks_initiated = NULL; ',
' END IF; ',
'END; ',
'SET ROLE ALL; ',
'SET @task_id =', QUOTE(task_id), '; ',
'IF @mysql_tasks_initiated IS NULL THEN ',
' SET @mysql_tasks_initiated = initiate_name; ',
'END IF; ',
-- synchronization with thread calling end_task_monitor
-- if the event was dropped (not found in information_schema.events)
-- terminate it
'IF (GET_LOCK(lock_name, 60) <=> 1 AND ',
'(SELECT COUNT(*)>0 FROM information_schema.events ise ',
' WHERE CONCAT(mysql_tasks.quote_identifier(ise.event_schema), ',
' ''.'', mysql_tasks.quote_identifier(ise.event_name)) = ',
QUOTE(event_name), ')) THEN ',
' CALL `mysql_tasks`.`add_task_log`(@task_id, ',
' ''Progress monitor started.'', ',
' JSON_OBJECT(''connection_id'', CONNECTION_ID()), 0, ''RUNNING''); ',
'END IF; ',
'WHILE IS_USED_LOCK(lock_name) <=> CONNECTION_ID() DO ',
' DO RELEASE_LOCK(lock_name); ',
'END WHILE; ',
-- if at any time event gets killed, terminate its while loop
'WHILE (SELECT COUNT(*)>0 FROM information_schema.events ise ',
' WHERE CONCAT(mysql_tasks.quote_identifier(ise.event_schema), ',
' ''.'', mysql_tasks.quote_identifier(ise.event_name)) = ',
QUOTE(event_name), ') DO ',
sql_statements,
' DO SLEEP(', refresh_period, '); ',
'END WHILE; ',
'SET @task_id = NULL; ',
'IF @mysql_tasks_initiated <=> initiate_name THEN ',
' SET @mysql_tasks_initiated = NULL; ',
'END IF; ',
'END'
);
PREPARE dynamic_statement FROM @eventSql;
EXECUTE dynamic_statement;
DEALLOCATE PREPARE dynamic_statement;
END IF;
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`stop_task_monitor`
-- for internal use
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`stop_task_monitor`%%
CREATE PROCEDURE `mysql_tasks`.`stop_task_monitor`(
IN `event_name` TEXT,
IN `task_id` VARCHAR(36)
)
SQL SECURITY INVOKER
COMMENT '
Stops a task monitor SQL Event.
Parameters:
- event_name: fully qualified name of the SQL Event that runs the
monitoring SQL statements
- task_id: ID of the task for which the task monitor has been started'
BEGIN
DECLARE log_msg JSON DEFAULT NULL;
DECLARE lock_name VARCHAR(64) DEFAULT RIGHT(event_name, 64);
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN
DO RELEASE_LOCK(lock_name);
END;
IF @mysql_tasks_initiated IS NULL THEN
SIGNAL SQLSTATE 'HY000' SET
MESSAGE_TEXT = 'stop_task_monitor procedure should not be called directly';
END IF;
IF event_name IS NOT NULL THEN
-- synchronization with thread running task_monitor
DO GET_LOCK(lock_name, 60);
SELECT `mysql_tasks`.`find_task_log_msg`(
task_id, 'Progress monitor started.') INTO log_msg;
IF (log_msg IS NOT NULL) THEN
-- due to a race condition, we might not enter here
-- but the task monitor event may be started
SET @killSql = CONCAT(
'KILL ', log_msg->>'$.data.connection_id'
);
PREPARE dynamic_statement FROM @killSql;
EXECUTE dynamic_statement;
DEALLOCATE PREPARE dynamic_statement;
END IF;
-- dropping the event will not kill its active process
SET @eventSql = CONCAT(
'DROP EVENT IF EXISTS ', event_name
);
PREPARE dynamic_statement FROM @eventSql;
EXECUTE dynamic_statement;
DEALLOCATE PREPARE dynamic_statement;
WHILE IS_USED_LOCK(lock_name) <=> CONNECTION_ID() DO
DO RELEASE_LOCK(lock_name);
END WHILE;
END IF;
END%%
-- -----------------------------------------------------
-- EVENT `mysql_tasks`.`task_cleanup`
-- Periodically cleans up data from old tasks.
-- Old tasks must have finished (COMPLETED, CANCELLED)
-- at least 6 days ago.
-- -----------------------------------------------------
DROP EVENT IF EXISTS `mysql_tasks`.`task_cleanup`%%
CREATE EVENT `mysql_tasks`.`task_cleanup` ON SCHEDULE EVERY 1 DAY
ON COMPLETION NOT PRESERVE ENABLE COMMENT 'Clean up old tasks' DO
BEGIN
DECLARE task_ids_to_del JSON DEFAULT NULL;
DECLARE initiate_name TEXT DEFAULT 'task_cleanup';
IF @mysql_tasks_initiated IS NULL THEN
SET @mysql_tasks_initiated = initiate_name;
END IF;
-- find all tasks with last tog time
-- 6 days or older from the current date
SELECT JSON_ARRAYAGG(
BIN_TO_UUID(last_tl.task_id, 1)
) INTO @task_ids_to_del
FROM (
SELECT
task_id,
MAX(log_time) AS last_log_time
FROM
`mysql_tasks`.`task_log_impl`
GROUP BY
task_id
) tl
LEFT OUTER JOIN
`mysql_tasks`.`task_log_impl` last_tl
ON tl.task_id = last_tl.task_id AND tl.last_log_time = last_tl.log_time
JOIN `mysql_tasks`.`task_impl` t
ON tl.task_id = t.id
WHERE (last_tl.status <> 'SCHEDULED' AND last_tl.status <> 'RUNNING')
AND DATE(last_tl.log_time) <= DATE_SUB(CURDATE(), INTERVAL 6 DAY)
AND t.server_uuid = UUID_TO_BIN(@@server_uuid, 1);
-- delete all old tasks
DELETE FROM
`mysql_tasks`.`task_impl`
WHERE BIN_TO_UUID(id, 1) MEMBER OF(@task_ids_to_del);
IF @mysql_tasks_initiated <=> initiate_name THEN
SET @mysql_tasks_initiated = NULL;
END IF;
END%%
-- -----------------------------------------------------
-- EVENT `mysql_tasks`.`task_gc`
-- Periodic garbage collector that cleans up tasks
-- which are in the active state but their
-- process does not exist.
-- -----------------------------------------------------
DROP EVENT IF EXISTS `mysql_tasks`.`task_gc`%%
CREATE EVENT `mysql_tasks`.`task_gc` ON SCHEDULE EVERY 1 MINUTE
ON COMPLETION NOT PRESERVE ENABLE COMMENT 'Garbage collector' DO
BEGIN
DECLARE i, j INT DEFAULT 0;
DECLARE json_id JSON DEFAULT NULL;
DECLARE json_user JSON DEFAULT NULL;
DECLARE json_data JSON DEFAULT NULL;
DECLARE curr_id VARCHAR(36) DEFAULT NULL;
DECLARE curr_user TEXT DEFAULT NULL;
DECLARE curr_data JSON DEFAULT NULL;
DECLARE event_name TEXT DEFAULT NULL;
DECLARE initiate_name TEXT DEFAULT 'task_gc';
IF @mysql_tasks_initiated IS NULL THEN
SET @mysql_tasks_initiated = initiate_name;
END IF;
CREATE TEMPORARY TABLE `mysql_tasks`.`processlist_snapshot` AS
SELECT * FROM `performance_schema`.`processlist`;
-- Find active tasks without alive process
SELECT JSON_ARRAYAGG(
BIN_TO_UUID(t.id, 1)),
JSON_ARRAYAGG(t.mysql_user),
JSON_ARRAYAGG(t.data)
INTO
json_id,
json_user,
json_data
FROM `mysql_tasks`.`task_log_impl` tl
JOIN (
SELECT task_id, MAX(log_time) AS max_log_time, MAX(id) AS max_id
FROM `mysql_tasks`.`task_log_impl`
GROUP BY task_id
) tl2
ON tl.log_time = tl2.max_log_time
JOIN `mysql_tasks`.`task_impl` t
ON tl.task_id = t.id
LEFT JOIN `mysql_tasks`.`processlist_snapshot` p ON t.connection_id = p.id
WHERE
(tl.status <=> 'RUNNING' OR tl.status <=> 'SCHEDULED')
AND p.id IS NULL
AND t.server_uuid <=> UUID_TO_BIN(@@server_uuid, 1)
AND t.data->'$.mysqlMetadata.autoGc' <=> true;
WHILE i < JSON_LENGTH(json_id) DO
SET curr_id = JSON_UNQUOTE(JSON_EXTRACT(json_id, CONCAT('$[', i, ']')));
SET curr_user = JSON_UNQUOTE(JSON_EXTRACT(json_user, CONCAT('$[', i, ']')));
SET curr_data = JSON_EXTRACT(json_data, CONCAT('$[', i, ']'));
IF curr_data IS NOT NULL AND
JSON_CONTAINS_PATH(curr_data, 'one', '$.mysqlMetadata.events') THEN
SET j = 0;
WHILE j < JSON_LENGTH(
JSON_EXTRACT(curr_data, '$.mysqlMetadata.events')) DO
SET event_name = JSON_UNQUOTE(
JSON_EXTRACT(curr_data, CONCAT('$.mysqlMetadata.events[', j, ']')));
CALL `mysql_tasks`.`drop_event`(event_name);
SET j = j + 1;
END WHILE;
END IF;
IF curr_id IS NOT NULL THEN
INSERT INTO `mysql_tasks`.`task_log_impl`(
`id`, `mysql_user`, `task_id`, `log_time`,
`message`, `data`, `progress`, `status`)
VALUES (
UUID_TO_BIN(UUID(), 1), curr_user, UUID_TO_BIN(curr_id, 1), NOW(6),
'Cleaned up by system.', NULL, 100, 'CANCELLED');
END IF;
SELECT i + 1 INTO i;
END WHILE;
-- Find dangling events from tasks which are no longer running
SELECT JSON_ARRAYAGG(BIN_TO_UUID(t.id, 1)), JSON_ARRAYAGG(t.data)
INTO
json_id, json_data
FROM `mysql_tasks`.`task_log_impl` tl
JOIN (
SELECT task_id, MAX(log_time) AS max_log_time, MAX(id) AS max_id
FROM `mysql_tasks`.`task_log_impl`
GROUP BY task_id
) tl2
ON tl.log_time = tl2.max_log_time
JOIN `mysql_tasks`.`task_impl` t
ON tl.task_id = t.id
JOIN information_schema.events ise
ON JSON_UNQUOTE(
t.data->'$.mysqlMetadata.events[0]') =
CONCAT(mysql_tasks.quote_identifier(
ise.EVENT_SCHEMA), '.', mysql_tasks.quote_identifier(ise.EVENT_NAME))
LEFT JOIN `mysql_tasks`.`processlist_snapshot` p ON t.connection_id = p.id
WHERE
(tl.status <> 'RUNNING' AND tl.status <> 'SCHEDULED')
AND p.id IS NULL
AND t.server_uuid = UUID_TO_BIN(@@server_uuid, 1)
AND t.data->'$.mysqlMetadata.autoGc' = true;
WHILE i < JSON_LENGTH(json_id) DO
SET curr_data = JSON_EXTRACT(json_data, CONCAT('$[', i, ']'));
IF curr_data IS NOT NULL AND
JSON_CONTAINS_PATH(curr_data, 'one', '$.mysqlMetadata.events') THEN
SET j = 0;
WHILE j < JSON_LENGTH(
JSON_EXTRACT(curr_data, '$.mysqlMetadata.events')) DO
SET event_name = JSON_UNQUOTE(
JSON_EXTRACT(curr_data, CONCAT('$.mysqlMetadata.events[', j, ']')));
CALL `mysql_tasks`.`drop_event`(event_name);
SET j = j + 1;
END WHILE;
END IF;
SELECT i + 1 INTO i;
END WHILE;
DROP TABLE `mysql_tasks`.`processlist_snapshot`;
IF @mysql_tasks_initiated <=> initiate_name THEN
SET @mysql_tasks_initiated = NULL;
END IF;
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`msm_instance_demoted`
-- Disable system events before replication switchover.
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`msm_instance_demoted`%%
CREATE PROCEDURE `mysql_tasks`.`msm_instance_demoted`()
SQL SECURITY INVOKER
COMMENT 'This procedure needs to be called on a primary instance in an InnoDB
Cluster setup before it is demoted to become a secondary.'
BEGIN
ALTER EVENT `mysql_tasks`.`task_cleanup` DISABLE;
ALTER EVENT `mysql_tasks`.`task_gc` DISABLE;
END%%
-- -----------------------------------------------------
-- Procedure `mysql_tasks`.`msm_instance_promoted`
-- Enable system events after replication switchover/failover.
-- -----------------------------------------------------
DROP PROCEDURE IF EXISTS `mysql_tasks`.`msm_instance_promoted`%%
CREATE PROCEDURE `mysql_tasks`.`msm_instance_promoted`()
SQL SECURITY INVOKER
COMMENT 'This procedure needs to be called on an instance in an InnoDB
Cluster setup when it is promoted to become the primary.'
BEGIN
ALTER EVENT `mysql_tasks`.`task_cleanup` ENABLE;
ALTER EVENT `mysql_tasks`.`task_gc` ENABLE;
END%%
DELIMITER ;
-- #############################################################################
-- MSM Section 170: Authorization
-- -----------------------------------------------------------------------------
-- This section is used to define the ROLEs and GRANT statements.
-- #############################################################################
-- Create ROLEs and assign privileges using GRANT statements
-- The mysql_task_admin ROLE allows to fully manage the MySQL tasks schema
-- The mysql_task_user ROLE allows to create and work with MySQL tasks
CREATE ROLE IF NOT EXISTS 'mysql_task_admin', 'mysql_task_user';
-- GRANTS for mysql_task_admin
GRANT SELECT, INSERT, DELETE, EXECUTE ON `mysql_tasks`.* TO
'mysql_task_admin';
GRANT SELECT ON `performance_schema`.`events_statements_current` TO
'mysql_task_admin';
GRANT SELECT ON `performance_schema`.`global_variables` TO 'mysql_task_admin';
-- GRANTS for mysql_task_user
GRANT SELECT ON `mysql_tasks`.`config` TO 'mysql_task_user';
GRANT SELECT ON `mysql_tasks`.`msm_schema_version` TO 'mysql_task_user';
GRANT INSERT ON `mysql_tasks`.`task_i` TO 'mysql_task_user';
GRANT INSERT ON `mysql_tasks`.`task_log_i` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_list` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_logs` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_status_list` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_status` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_status_brief` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`active_task_count` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`find_task_log_msg` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_ids_from_alias` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_id` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_alias` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_log_data_json_schema` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_connection_id` TO
'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`create_task` TO 'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`create_task_with_id` TO
'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`add_task_log` TO 'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`kill_task` TO 'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`drop_event` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_list` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_logs` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_status_list` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_status` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_status_brief` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_app_task_ids_from_alias` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_app_task_id` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_app_task_alias` TO
'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`create_app_task` TO 'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`create_app_task_with_id` TO
'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`kill_app_task` TO 'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`execute_prepared_stmt_from_app_async`
TO 'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`execute_prepared_stmt_async` TO
'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`start_task_monitor` TO
'mysql_task_user';
GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`stop_task_monitor` TO
'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`quote_identifier` TO 'mysql_task_user';
GRANT EXECUTE ON FUNCTION `mysql_tasks`.`extract_username` TO 'mysql_task_user';
GRANT SELECT ON `performance_schema`.`events_statements_current` TO
'mysql_task_user';
-- #############################################################################
-- MSM Section 910: Database Schema Version
-- -----------------------------------------------------------------------------
-- Setting the correct database schema version.
-- #############################################################################
CREATE OR REPLACE SQL SECURITY INVOKER
VIEW `mysql_tasks`.`msm_schema_version` (
`major`,`minor`,`patch`) AS
SELECT 3, 0, 0;
-- #############################################################################
-- MSM Section 920: Server Variable Restoration
-- -----------------------------------------------------------------------------
-- Restore the modified server variables to their original state.
-- #############################################################################
SET SQL_MODE=@OLD_SQL_MODE;
SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS;
SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS;