mysql-test/suite/router/include/schema/mysql_tasks.sql (2,047 lines of code) (raw):

# ############################################################################# # MSM Section 001: Database Schema Create Script # ----------------------------------------------------------------------------- # This script creates the `mysql_tasks` database schema version 3.0.0 # ----------------------------------------------------------------------------- # ############################################################################# # MSM Section 010: Server Variable Settings # ----------------------------------------------------------------------------- # Set server variables, remember their state to be able to restore accordingly. # ----------------------------------------------------------------------------- SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0; SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0; SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,' 'NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,' 'NO_ENGINE_SUBSTITUTION'; # ############################################################################# # MSM Section 110: Database Schema Creation # ----------------------------------------------------------------------------- # CREATE SCHEMA statement. # ----------------------------------------------------------------------------- CREATE SCHEMA IF NOT EXISTS `mysql_tasks` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci; # ############################################################################# # MSM Section 120: Database Schema Version Creation Indication # ----------------------------------------------------------------------------- # Create the `${schema_name}`.`msm_schema_version` VIEW and initialize it with # the version 0, 0, 0 which indicates the ongoing creation processes of the # `${schema_name}` database schema. # ----------------------------------------------------------------------------- CREATE OR REPLACE SQL SECURITY INVOKER VIEW `mysql_tasks`.`msm_schema_version` ( `major`,`minor`,`patch` ) AS SELECT 0, 0, 0; # ############################################################################# # MSM Section 130: Creation of Helpers # ----------------------------------------------------------------------------- # Definitions of optional helper PROCEDUREs and FUNCTIONs that are called # during the creations of the database schema. # ----------------------------------------------------------------------------- DELIMITER %%; # Insert optional helper PROCEDUREs and FUNCTIONs here DELIMITER ;%% # ############################################################################# # MSM Section 140: Non-idempotent Schema Objects # ----------------------------------------------------------------------------- # This section contains creation of schema TABLEs and the initialization of # base data (standard INSERTs). It is important to note that all other # schema objects (VIEWs, PROCEDUREs, FUNCTIONs, TRIGGERs EVENTs, ...) need to # be created inside the MSM Section: idempotent schema object changes. # ----------------------------------------------------------------------------- # CREATE TABLE statements and standard INSERTs. # ----------------------------------------------------------------------------- # ----------------------------------------------------- # Table `mysql_tasks`.`config` # for internal use # ----------------------------------------------------- CREATE TABLE IF NOT EXISTS `mysql_tasks`.`config` ( `id` TINYINT NOT NULL DEFAULT 1, `data` JSON NULL, PRIMARY KEY (`id`), CONSTRAINT Config_OnlyOneRow CHECK (id = 1)) ENGINE = InnoDB; # cSpell:ignore Lakehouse INSERT IGNORE INTO `mysql_tasks`.`config` (`id`, `data`) VALUES (1, '{ "limits": { "maximumPreparedStmtAsyncTasks": 100, "maximumHeatwaveLoadingTasks": 5 } }' ); # ----------------------------------------------------- # Table `mysql_tasks`.`task_impl` # for internal use # ----------------------------------------------------- CREATE TABLE IF NOT EXISTS `mysql_tasks`.`task_impl` ( `id` BINARY(16) NOT NULL COMMENT 'A UUID uniquely identifying the task across replication instances in binary format. The id should be created by the function call UUID_TO_BIN(UUID(), 1) which generates the BINARY representation of the UUID in reverse order to improve indexing. The field is usually hidden from end users.', `mysql_user` VARCHAR(288) DEFAULT (CURRENT_USER()) COMMENT 'The MySQL user that created the task.', `app_user_id` VARCHAR(255) COMMENT 'An optional ID representing a specific application user. If set, the app_user_id will be used to filter tasks per application users, preventing an app user to see tasks from other app users.', `alias` VARCHAR(16) COMMENT 'A human readable alias that allows easier referencing of a specific task. It uses the format {Abbreviated weekday name}-{task count per mysql_user or app_user_id if specified}, e.g. Mon-1, Mon-2, Tue-1, etc. Please note that there is no guarantee that the alias will be unique, while still being useful in the majority of cases as old task are deleted after 6 days.', `name` VARCHAR(255) NOT NULL COMMENT 'The name of the task.', `server_uuid` BINARY(16) NOT NULL COMMENT 'The UUID of the server on which the task has been created. It should be populated using UUID_TO_BIN(@@server_uuid, 1).', `connection_id` BIGINT UNSIGNED NOT NULL COMMENT 'The MySQL server connection_id that was used to created the task.', `task_type` VARCHAR(80) NOT NULL COMMENT 'An application defined task type, used for filtering of tasks per type.', `data` JSON COMMENT 'Can hold application specific data.', `data_json_schema` JSON COMMENT 'A JSON schema defining the structure of the data field for the given task.', `log_data_json_schema` JSON COMMENT 'A JSON schema defining the structure of the task_log_impl.data field for the given task.', PRIMARY KEY(`id`), INDEX(`mysql_user`(192)), INDEX(`mysql_user`(176), `alias`), INDEX(`task_type`) ) ENGINE=InnoDB; # ----------------------------------------------------- # Table `mysql_tasks`.`task_log_impl` # for internal use # ----------------------------------------------------- CREATE TABLE IF NOT EXISTS `mysql_tasks`.`task_log_impl` ( `id` BINARY(16) NOT NULL COMMENT 'A UUID uniquely identifying a task log entry. It should be created by the function call UUID_TO_BIN(UUID(), 1) which generates the BINARY representation of the UUID in the reverse order to improve indexing.', `task_id` BINARY(16) NOT NULL COMMENT 'The task ID (foreign key).', `mysql_user` VARCHAR(288) DEFAULT (CURRENT_USER()) COMMENT 'The MySQL user that created the task / inserted the task log.', `log_time` TIMESTAMP(6) NOT NULL COMMENT 'A timestamp when the task log entry was inserted.', `message` VARCHAR(2000) COMMENT 'A task log message.', `data` JSON COMMENT 'Can hold application specific log data. It must conform to log_data_json_schema defined in the`task_impl` table.', `progress` SMALLINT NOT NULL DEFAULT 0 COMMENT 'A task completion progress between 0 and 100%.', `status` ENUM('SCHEDULED', 'RUNNING', 'COMPLETED', 'ERROR', 'CANCELLED') DEFAULT 'SCHEDULED' COMMENT 'The task state. When created, a task goes in the SCHEDULED state, then is moved to RUNNING and finally COMPLETED state. In case of ERROR, the task status becomes ERROR. When task is killed by the user or by the garbage collector, it gets the CANCELLED status.', PRIMARY KEY(`id`), INDEX(`mysql_user`(192)), INDEX(`log_time`), INDEX(`status`), CONSTRAINT `fk_task_log_task_id` FOREIGN KEY (`task_id`) REFERENCES `mysql_tasks`.`task_impl` (`id`) ) ENGINE=InnoDB; # ############################################################################# # MSM Section 150: Idempotent Schema Objects # ----------------------------------------------------------------------------- # This section contains the creation of all schema objects except TABLEs. # All objects must be created under the assumption that an object of the same # name is already present. Therefore explicit DROP IF EXISTS statements or # CREATE OR REPLACE clauses must be used when creating the objects. # ----------------------------------------------------------------------------- # All other schema object definitions (VIEWS, PROCEDUREs, FUNCTIONs, TRIGGERs, # EVENTS, ...). # ----------------------------------------------------------------------------- DELIMITER %%; # ----------------------------------------------------- # Trigger `mysql_tasks`.`task_impl_BEFORE_INSERT` # Populate server_uuid and alias on insert # ----------------------------------------------------- DROP TRIGGER IF EXISTS `mysql_tasks`.`task_impl_BEFORE_INSERT`%% CREATE TRIGGER `mysql_tasks`.`task_impl_BEFORE_INSERT` BEFORE INSERT ON `mysql_tasks`.`task_impl` FOR EACH ROW BEGIN DECLARE day_abbr VARCHAR(3); DECLARE max_index INT UNSIGNED; -- Get the abbreviated day of the week SET day_abbr = DATE_FORMAT(CURDATE(), '%a'); -- Find the next free index for the given user SELECT /*+ SET_VAR(use_secondary_engine=off) */ IFNULL( MAX( CAST(SUBSTRING_INDEX(alias, '-', -1) AS UNSIGNED) ), 0 ) + 1 INTO max_index FROM `mysql_tasks`.`task_impl` WHERE `mysql_user` = NEW.mysql_user AND ( (NEW.app_user_id IS NOT NULL AND `app_user_id` = NEW.app_user_id) OR (NEW.app_user_id IS NULL AND `app_user_id` IS NULL) ) AND alias LIKE CONCAT(day_abbr, '-%'); -- Set the alias (if not set) SET NEW.`alias` = COALESCE(NEW.`alias`, CONCAT(day_abbr, '-', max_index)); -- Set the server uuid (if not set) SET NEW.`server_uuid` = COALESCE(NEW.`server_uuid`, UUID_TO_BIN(@@server_uuid, 1)); END%% # ----------------------------------------------------- # Trigger `mysql_tasks`.`task_impl_BEFORE_DELETE` # ----------------------------------------------------- DROP TRIGGER IF EXISTS `mysql_tasks`.`task_impl_BEFORE_DELETE`%% CREATE TRIGGER `mysql_tasks`.`task_impl_BEFORE_DELETE` BEFORE DELETE ON `mysql_tasks`.`task_impl` FOR EACH ROW BEGIN DELETE FROM `mysql_tasks`.`task_log_impl` WHERE `task_id` = OLD.`id`; END%% DELIMITER ;%% # ----------------------------------------------------- # View `mysql_tasks`.`task_i` # Useful for inserts. Prevents an invoker from setting # a different mysql_user for a record. # Note: grant only INSERT on this view to users # (not SELECT) # for internal use # ----------------------------------------------------- DROP VIEW IF EXISTS `mysql_tasks`.`task_i`; CREATE SQL SECURITY DEFINER VIEW `mysql_tasks`.`task_i` AS SELECT `task_impl`.`id` AS `id`, `task_impl`.`app_user_id` AS `app_user_id`, `task_impl`.`alias` AS `alias`, `task_impl`.`name` AS `name`, `task_impl`.`server_uuid` AS `server_uuid`, `task_impl`.`connection_id` AS `connection_id`, `task_impl`.`task_type` AS `task_type`, `task_impl`.`data` AS `data`, `task_impl`.`data_json_schema` AS `data_json_schema`, `task_impl`.`log_data_json_schema` AS `log_data_json_schema` FROM `mysql_tasks`.`task_impl` WHERE (LEFT( `mysql_user`, (LENGTH(`mysql_user`) - locate('@', reverse(`mysql_user`))) ) = LEFT(user(),(length(user()) - locate('@', reverse(user()))))); # ----------------------------------------------------- # View `mysql_tasks`.`task_log_i` # Useful for inserts. Prevents an invoker from setting # a different mysql_user for a record. # Note: grant only INSERT on this view to users # (not SELECT) # for internal use # ----------------------------------------------------- DROP VIEW IF EXISTS `mysql_tasks`.`task_log_i`; CREATE SQL SECURITY DEFINER VIEW `mysql_tasks`.`task_log_i` AS SELECT `task_log_impl`.`id` AS `id`, `task_log_impl`.`task_id` AS `task_id`, `task_log_impl`.`log_time` AS `log_time`, `task_log_impl`.`message` AS `message`, `task_log_impl`.`data` AS `data`, `task_log_impl`.`progress` AS `progress`, `task_log_impl`.`status` AS `status` FROM `mysql_tasks`.`task_log_impl` WHERE (LEFT( `mysql_user`, (LENGTH(`mysql_user`) - LOCATE('@', REVERSE(`mysql_user`))) ) = LEFT(user(),(length(user()) - LOCATE('@', REVERSE(user()))))); # ----------------------------------------------------- # View `mysql_tasks`.`task_status_impl` # for internal use # ----------------------------------------------------- DROP VIEW IF EXISTS `mysql_tasks`.`task_status_impl`; CREATE SQL SECURITY INVOKER VIEW `mysql_tasks`.`task_status_impl` AS SELECT t.id AS id, t.app_user_id AS app_user_id, t.alias AS alias, t.server_uuid AS server_uuid, t.name AS name, t.mysql_user AS mysql_user, t.connection_id AS connection_id, t.task_type AS task_type, t.data AS data, last_tl.data AS log_data, IF ( t.server_uuid = server_info.server_uuid, last_tl.message, 'Not started on this server.' ) AS message, IF ( t.server_uuid = server_info.server_uuid, last_tl.progress, 0 ) AS progress, IF ( t.server_uuid = server_info.server_uuid, last_tl.status, 'EXTERNAL' ) AS status, CAST(tl.first_log_time AS DATETIME) AS 'scheduled_time', IF ( t.server_uuid = server_info.server_uuid, CAST(tl_running.first_log_time AS DATETIME), NULL ) AS 'starting_time', IF ( t.server_uuid = server_info.server_uuid, IF( last_tl.progress = 0, NULL, CAST( TIMESTAMPADD( SECOND, ROUND( TIMESTAMPDIFF( SECOND, tl_running.first_log_time, tl.last_log_time ) / last_tl.progress * 100 ), tl_running.first_log_time ) AS DATETIME ) ), NULL ) AS 'estimated_completion_time', IF ( t.server_uuid = server_info.server_uuid, IF( last_tl.progress = 0, NULL, TIMESTAMPDIFF( SECOND, tl_running.first_log_time, tl.last_log_time ) / last_tl.progress * 100.0 - TIMESTAMPDIFF( SECOND, tl_running.first_log_time, tl.last_log_time ) ), NULL ) AS 'estimated_remaining_time', IF ( t.server_uuid = server_info.server_uuid, CONCAT( REPEAT('#', FLOOR(last_tl.progress / 10)), REPEAT('_', 10 - FLOOR(last_tl.progress / 10)) ), REPEAT('_', 10) ) AS 'progress_bar' FROM `mysql_tasks`.`task_impl` t INNER JOIN ( SELECT tl1.task_id, MIN(tl1.log_time) AS first_log_time, MAX(tl1.log_time) AS last_log_time, COUNT(*) AS log_count FROM `mysql_tasks`.`task_log_impl` tl1 GROUP BY tl1.task_id ) tl ON t.id = tl.task_id LEFT OUTER JOIN `mysql_tasks`.`task_log_impl` last_tl ON tl.task_id = last_tl.task_id AND tl.last_log_time = last_tl.log_time LEFT OUTER JOIN ( SELECT tl2.task_id, MIN(tl2.log_time) AS first_log_time FROM `mysql_tasks`.`task_log_impl` tl2 WHERE status = 'RUNNING' GROUP BY tl2.task_id ) tl_running ON t.id = tl_running.task_id JOIN ( SELECT UUID_TO_BIN(VARIABLE_VALUE, 1) AS server_uuid FROM performance_schema.global_variables WHERE VARIABLE_NAME = 'server_uuid' ) server_info ON 1 = 1; DELIMITER %%; # ----------------------------------------------------- # Function `mysql_tasks`.`app_task_list` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_list`%% CREATE FUNCTION `mysql_tasks`.`app_task_list`( `app_user_id` VARCHAR(255), `task_type` VARCHAR(80), `offset` INT UNSIGNED, `limit` INT UNSIGNED ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a paginated list of application tasks. Parameters: - app_user_id: application user id to filter the list on - task_type: type to filter on, if NULL returns all types - offset: pagination offset - limit: pagination limit' BEGIN DECLARE tasks JSON DEFAULT NULL; IF `task_type` IS NULL THEN SET `task_type` = '%'; END IF; IF `offset` IS NULL THEN SET `offset` = 0; END IF; IF `limit` IS NULL THEN SET `limit` = 20; END IF; SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_ARRAYAGG( JSON_OBJECT( 'id', BIN_TO_UUID(t.id, 1), 'name', t.name, 'connection_id', t.connection_id, 'task_type', t.task_type, 'data', t.data )) INTO tasks FROM ( SELECT t1.id, t1.name, t1.connection_id, t1.task_type, t1.data FROM `mysql_tasks`.`task_impl` t1 WHERE (LEFT( t1.`mysql_user`, (LENGTH(t1.`mysql_user`) - locate('@', reverse(t1.`mysql_user`))) ) = LEFT(user(),(length(user()) - locate('@', reverse(user()))))) AND (`app_user_id` IS NULL OR t1.app_user_id = `app_user_id`) AND t1.task_type LIKE `task_type` ORDER BY t1.id DESC LIMIT `limit` OFFSET `offset` ) t; RETURN tasks; END%% # ----------------------------------------------------- # Function `mysql_tasks`.`task_list` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`task_list`%% CREATE FUNCTION `mysql_tasks`.`task_list`( `task_type` VARCHAR(80), `offset` INT UNSIGNED, `limit` INT UNSIGNED ) RETURNS JSON READS SQL DATA SQL SECURITY INVOKER COMMENT ' Returns a paginated list of tasks Parameters: - task_type: type to filter on, if NULL returns all types - offset: pagination offset - limit: pagination limit' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`app_task_list`(NULL, `task_type`, `offset`, `limit`) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`app_task` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task`%% CREATE FUNCTION `mysql_tasks`.`app_task`( app_user_id VARCHAR(255), id_or_alias VARCHAR(36) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns information about a single application task Parameters: - app_user_id: application user id to filter the list on - id_or_alias: task UUID or its unique alias' BEGIN DECLARE tasks JSON DEFAULT NULL; DECLARE task_id VARCHAR(36); SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_task_id`(id_or_alias) INTO task_id; SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_OBJECT( 'id', BIN_TO_UUID(t.id, 1), 'alias', t.alias, 'name', t.name, 'connection_id', t.connection_id, 'task_type', t.task_type, 'data', t.data, 'data_json_schema', t.data_json_schema, 'log_data_json_schema', t.log_data_json_schema ) INTO tasks FROM `mysql_tasks`.`task_impl` t WHERE (LEFT( t.`mysql_user`, (LENGTH(t.`mysql_user`) - locate('@', reverse(t.`mysql_user`))) ) = LEFT(user(),(length(user()) - locate('@', reverse(user()))))) AND (`app_user_id` IS NULL OR t.app_user_id = `app_user_id`) AND t.id = UUID_TO_BIN(`task_id`, 1); RETURN tasks; END%% # ----------------------------------------------------- # Function `mysql_tasks`.`task` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`task`%% CREATE FUNCTION `mysql_tasks`.`task`( id_or_alias VARCHAR(36) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns information about a single task Parameters: - id_or_alias: task UUID or its unique alias' BEGIN RETURN ( SELECT `mysql_tasks`.`app_task`(NULL, id_or_alias) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`app_task_logs` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_logs`%% CREATE FUNCTION `mysql_tasks`.`app_task_logs`( app_user_id VARCHAR(255), id_or_alias VARCHAR(36), newer_than_log_time TIMESTAMP(6) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a list of logs belonging to an application task Parameters: - app_user_id: application user id to filter the list on - id_or_alias: task UUID or its unique alias - newer_than_log_time: if not NULL, only return log entries newer than the specified timestamp' BEGIN DECLARE task_logs JSON DEFAULT NULL; DECLARE task_id VARCHAR(36); SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_task_id`(id_or_alias) INTO task_id; SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_ARRAYAGG( JSON_OBJECT( 'id', BIN_TO_UUID(tl.id, 1), 'task_id', BIN_TO_UUID(tl.task_id, 1), 'log_time', tl.log_time, 'message', tl.message, 'data', tl.data, 'progress', tl.progress, 'status', tl.status )) INTO task_logs FROM `mysql_tasks`.`task_log_impl` tl JOIN `mysql_tasks`.`task_impl` t ON tl.task_id = t.id WHERE (LEFT( tl.`mysql_user`, (LENGTH(tl.`mysql_user`) - locate('@', reverse(tl.`mysql_user`))) ) = LEFT(user(),(length(user()) - locate('@', reverse(user()))))) AND (`app_user_id` IS NULL OR t.app_user_id = `app_user_id`) AND tl.task_id = UUID_TO_BIN(`task_id`, 1) AND tl.log_time > COALESCE(unix_timestamp(newer_than_log_time), '1970-01-01') ORDER BY tl.log_time DESC; RETURN task_logs; END%% # ----------------------------------------------------- # Function `mysql_tasks`.`task_logs` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`task_logs`%% CREATE FUNCTION `mysql_tasks`.`task_logs`( id_or_alias VARCHAR(36), newer_than_log_time TIMESTAMP(6) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a list of logs belonging to a task Parameters: - id_or_alias: task UUID or its unique alias - newer_than_log_time: if not NULL, only return log entries newer than the specified timestamp' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`app_task_logs`(NULL, id_or_alias, newer_than_log_time) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`app_task_status_list` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_status_list`%% CREATE FUNCTION `mysql_tasks`.`app_task_status_list`( `app_user_id` VARCHAR(255), `task_type` VARCHAR(80), `offset` INT UNSIGNED, `limit` INT UNSIGNED ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a paginated list of application task statuses Parameters: - app_user_id: application user id to filter the list on - task_type: type to filter on, if NULL returns all types - offset: pagination offset - limit: pagination limit' BEGIN IF `task_type` IS NULL THEN SET `task_type` = '%'; END IF; IF `offset` IS NULL THEN SET `offset` = 0; END IF; IF `limit` IS NULL THEN SET `limit` = 20; END IF; RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_ARRAYAGG( JSON_OBJECT( 'id', BIN_TO_UUID(sq.id, 1), 'alias', sq.alias, 'server_uuid', BIN_TO_UUID(sq.server_uuid, 1), 'name', sq.name, 'task_type', sq.task_type, 'task_data', sq.data, 'data', sq.log_data, 'message', sq.message, 'progress', sq.progress, 'status', sq.status, 'scheduled_time', sq.scheduled_time, 'starting_time', sq.starting_time, 'estimated_completion_time', sq.estimated_completion_time, 'estimated_remaining_time', sq.estimated_remaining_time, 'progress_bar', sq.progress_bar, 'row_hash', MD5(CONCAT_WS(',', sq.status, sq.message, sq.progress, sq.data, sq.log_data, sq.scheduled_time, sq.starting_time, sq.estimated_completion_time, sq.estimated_remaining_time ) ) ) ) FROM ( SELECT * FROM `mysql_tasks`.`task_status_impl` tsi WHERE LEFT( tsi.mysql_user, LENGTH(tsi.mysql_user) - LOCATE('@', REVERSE(tsi.mysql_user)) ) = LEFT( SESSION_USER(), LENGTH(SESSION_USER()) - LOCATE('@', REVERSE(SESSION_USER())) ) AND (`app_user_id` IS NULL OR tsi.app_user_id = `app_user_id`) AND tsi.task_type LIKE `task_type` ORDER BY tsi.id DESC LIMIT `limit` OFFSET `offset` ) sq ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`task_status_list` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`task_status_list`%% CREATE FUNCTION `mysql_tasks`.`task_status_list`( `task_type` VARCHAR(80), `offset` INT UNSIGNED, `limit` INT UNSIGNED ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a paginated list of task statuses Parameters: - task_type: type to filter on, if NULL returns all types - offset: pagination offset - limit: pagination limit' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`app_task_status_list`(NULL, `task_type`, `offset`, `limit`) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`app_task_status` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_status`%% CREATE FUNCTION `mysql_tasks`.`app_task_status`( app_user_id VARCHAR(255), id_or_alias VARCHAR(36) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns status of an application task Parameters: - app_user_id: application user id to filter the list on - id_or_alias: task UUID or its unique alias' BEGIN DECLARE task_id VARCHAR(36); SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_task_id`(id_or_alias) INTO task_id; RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_OBJECT( 'id', BIN_TO_UUID(sq.id, 1), 'alias', sq.alias, 'server_uuid', BIN_TO_UUID(sq.server_uuid, 1), 'name', sq.name, 'connection_id', sq.connection_id, 'task_type', sq.task_type, 'task_data', sq.data, 'data', sq.log_data, 'message', sq.message, 'progress', sq.progress, 'status', sq.status, 'scheduled_time', sq.scheduled_time, 'starting_time', sq.starting_time, 'estimated_completion_time', sq.estimated_completion_time, 'estimated_remaining_time', sq.estimated_remaining_time, 'progress_bar', sq.progress_bar, 'row_hash', MD5(CONCAT_WS(',', sq.status, sq.message, sq.progress, sq.data, sq.log_data, sq.scheduled_time, sq.starting_time, sq.estimated_completion_time, sq.estimated_remaining_time ) ) ) FROM ( SELECT * FROM `mysql_tasks`.`task_status_impl` tsi WHERE LEFT( tsi.mysql_user, LENGTH(tsi.mysql_user) - LOCATE('@', REVERSE(tsi.mysql_user)) ) = LEFT( SESSION_USER(), LENGTH(SESSION_USER()) - LOCATE('@', REVERSE(SESSION_USER())) ) AND (`app_user_id` IS NULL OR tsi.app_user_id = `app_user_id`) AND tsi.id = UUID_TO_BIN(`task_id`, 1) ) sq ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`task_status` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`task_status`%% CREATE FUNCTION `mysql_tasks`.`task_status`( id_or_alias VARCHAR(36) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns task status Parameters: - id_or_alias: task UUID or its unique alias' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`app_task_status`(NULL, id_or_alias) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`app_task_status_brief` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`app_task_status_brief`%% CREATE FUNCTION `mysql_tasks`.`app_task_status_brief`( app_user_id VARCHAR(255), id_or_alias VARCHAR(36) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a brief status of an application task Parameters: - app_user_id: application user id to filter the list on - id_or_alias: task UUID or its unique alias' BEGIN DECLARE task_id VARCHAR(36); SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_task_id`(id_or_alias) INTO task_id; RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_OBJECT( 'data', sq.log_data, 'message', sq.message, 'progress', sq.progress, 'status', sq.status ) FROM ( SELECT * FROM `mysql_tasks`.`task_status_impl` tsi WHERE LEFT( tsi.mysql_user, LENGTH(tsi.mysql_user) - LOCATE('@', REVERSE(tsi.mysql_user)) ) = LEFT( SESSION_USER(), LENGTH(SESSION_USER()) - LOCATE('@', REVERSE(SESSION_USER())) ) AND (`app_user_id` IS NULL OR tsi.app_user_id = `app_user_id`) AND tsi.id = UUID_TO_BIN(`task_id`, 1) LIMIT 1 ) sq ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`task_status_brief` # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`task_status_brief`%% CREATE FUNCTION `mysql_tasks`.`task_status_brief`( id_or_alias VARCHAR(36) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a brief task status Parameters: - id_or_alias: task UUID or its unique alias ' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`app_task_status_brief`(NULL, id_or_alias) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`find_task_log_msg` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`find_task_log_msg`%% CREATE FUNCTION `mysql_tasks`.`find_task_log_msg`( task_id VARCHAR(36), log_msg TEXT ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER BEGIN DECLARE task_log JSON DEFAULT NULL; SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_OBJECT( 'id', BIN_TO_UUID(tl.id, 1), 'log_time', tl.log_time, 'data', tl.data, 'progress', tl.progress, 'status', tl.status ) INTO task_log FROM `mysql_tasks`.`task_log_impl` tl WHERE tl.task_id = UUID_TO_BIN(`task_id`, 1) AND tl.message = log_msg; RETURN task_log; END%% # ----------------------------------------------------- # Function `mysql_tasks`.`get_task_log_data_json_schema` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_log_data_json_schema`%% CREATE FUNCTION `mysql_tasks`.`get_task_log_data_json_schema`( task_id VARCHAR(36) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ t.log_data_json_schema FROM `mysql_tasks`.`task_impl` t WHERE t.id = UUID_TO_BIN(`task_id`, 1) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`get_task_connection_id` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_connection_id`%% CREATE FUNCTION `mysql_tasks`.`get_task_connection_id`( task_id VARCHAR(36) ) RETURNS BIGINT UNSIGNED READS SQL DATA SQL SECURITY DEFINER BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ t.connection_id FROM `mysql_tasks`.`task_impl` t WHERE t.id = UUID_TO_BIN(`task_id`, 1) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`active_task_count` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`active_task_count`%% CREATE FUNCTION `mysql_tasks`.`active_task_count`(task_type VARCHAR(80)) RETURNS INT UNSIGNED READS SQL DATA SQL SECURITY DEFINER BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ COUNT(active_task.task_id) FROM ( SELECT DISTINCT(tl.task_id) AS task_id FROM `mysql_tasks`.`task_log_impl` tl JOIN `mysql_tasks`.`task_impl` t ON tl.task_id = t.id WHERE tl.status IN ('RUNNING', 'SCHEDULED') AND (t.task_type = task_type OR task_type IS NULL) AND tl.task_id NOT IN ( SELECT DISTINCT(tli.task_id) FROM `mysql_tasks`.`task_log_impl` tli WHERE tli.status IN ('COMPLETED', 'ERROR', 'CANCELLED') ) ) active_task ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`active_user_task_count` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`active_user_task_count`%% CREATE FUNCTION `mysql_tasks`.`active_user_task_count`(task_type VARCHAR(80)) RETURNS INT UNSIGNED READS SQL DATA SQL SECURITY DEFINER BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ COUNT(active_task.task_id) FROM ( SELECT DISTINCT(tl.task_id) AS task_id FROM `mysql_tasks`.`task_log_impl` tl JOIN `mysql_tasks`.`task_impl` t ON tl.task_id = t.id WHERE LEFT( t.mysql_user, LENGTH(t.mysql_user) - LOCATE('@', REVERSE(t.mysql_user)) ) = LEFT( SESSION_USER(), LENGTH(SESSION_USER()) - LOCATE('@', REVERSE(SESSION_USER())) ) AND tl.status IN ('RUNNING', 'SCHEDULED') AND (t.task_type = task_type OR task_type IS NULL) AND tl.task_id NOT IN ( SELECT DISTINCT(tli.task_id) FROM `mysql_tasks`.`task_log_impl` tli WHERE tli.status IN ('COMPLETED', 'ERROR', 'CANCELLED') ) ) active_task ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`get_app_task_ids_from_alias` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`get_app_task_ids_from_alias`%% CREATE FUNCTION `mysql_tasks`.`get_app_task_ids_from_alias`( app_user_id VARCHAR(255), alias VARCHAR(16) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a list of the task UUIDs with a given alias, belonging to a given application user. Parameters: - app_user_id: application user id to filter the list on - alias: task alias' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_ARRAYAGG( BIN_TO_UUID(t.id, 1) ) FROM `mysql_tasks`.`task_impl` t WHERE LEFT( t.mysql_user, LENGTH(t.mysql_user) - LOCATE('@', REVERSE(t.mysql_user)) ) = LEFT( SESSION_USER(), LENGTH(SESSION_USER()) - LOCATE('@', REVERSE(SESSION_USER())) ) AND (`app_user_id` IS NULL OR t.app_user_id = `app_user_id`) AND t.alias = alias ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`get_task_ids_from_alias` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_ids_from_alias`%% CREATE FUNCTION `mysql_tasks`.`get_task_ids_from_alias`( alias VARCHAR(16) ) RETURNS JSON READS SQL DATA SQL SECURITY DEFINER COMMENT ' Returns a list of the task UUIDs with a given alias. Parameters: - alias: task alias' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_app_task_ids_from_alias`(NULL, alias) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`get_app_task_id` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`get_app_task_id`%% CREATE FUNCTION `mysql_tasks`.`get_app_task_id`( app_user_id VARCHAR(255), id_or_alias VARCHAR(36) ) RETURNS VARCHAR(36) READS SQL DATA SQL SECURITY INVOKER COMMENT ' Returns an application task UUID. Parameters: - app_user_id: application user id to filter the list on - id_or_alias: task UUID or its unique alias. if UUID, returns it. if alias, looks up its UUID.' BEGIN DECLARE task_ids JSON; DECLARE task_id VARCHAR(36); IF (CHAR_LENGTH(id_or_alias) = 36) THEN -- If argument has the exact length of UUID, assume task_id is provided SELECT id_or_alias INTO task_id; ELSE -- Otherwise, assume alias is provided -- Replace task_alias with task_id SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_app_task_ids_from_alias`( app_user_id, id_or_alias ) INTO task_ids; IF JSON_LENGTH(task_ids) > 1 THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Multiple tasks for this alias, re-run using a' ' task_id. For a list IDs: SELECT' ' mysql_tasks.get_task_ids_from_alias(alias);'; END IF; SELECT task_ids->>'$[0]' INTO task_id; END IF; RETURN task_id; END%% # ----------------------------------------------------- # Function `mysql_tasks`.`get_task_id` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_id`%% CREATE FUNCTION `mysql_tasks`.`get_task_id`( id_or_alias VARCHAR(36) ) RETURNS VARCHAR(36) READS SQL DATA SQL SECURITY INVOKER COMMENT ' Returns a task UUID. Parameters: - id_or_alias: task UUID or its unique alias. if UUID, returns it. if alias, looks up its UUID.' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_app_task_id`(NULL, id_or_alias) ); END%% # ----------------------------------------------------- # Function `mysql_tasks`.`get_app_task_alias` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`get_app_task_alias`%% CREATE FUNCTION `mysql_tasks`.`get_app_task_alias`( app_user_id VARCHAR(255), id VARCHAR(36) ) RETURNS VARCHAR(16) READS SQL DATA SQL SECURITY INVOKER COMMENT ' Returns an application task alias given its UUID. Parameters: - app_user_id: application user id to filter the list on - id: task UUID' BEGIN DECLARE task_info JSON; SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`app_task`(app_user_id, id) INTO task_info; RETURN task_info->>'$.alias'; END%% # ----------------------------------------------------- # Function `mysql_tasks`.`get_task_alias` # for internal use # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`get_task_alias`%% CREATE FUNCTION `mysql_tasks`.`get_task_alias`( id VARCHAR(36) ) RETURNS VARCHAR(16) READS SQL DATA SQL SECURITY INVOKER COMMENT ' Returns a task alias given its UUID. Parameters: - id: task UUID' BEGIN RETURN ( SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_app_task_alias`(NULL, id) ); END%% # ----------------------------------------------------- # FUNCTION `mysql_tasks`.`quote_identifier` # for internal use # (copy of sys.quote_identifier to avoid permission issues) # ----------------------------------------------------- DROP FUNCTION IF EXISTS `mysql_tasks`.`quote_identifier`%% CREATE FUNCTION `mysql_tasks`.`quote_identifier`(in_identifier TEXT) RETURNS TEXT CHARSET UTF8MB4 SQL SECURITY INVOKER DETERMINISTIC NO SQL BEGIN RETURN CONCAT('`', REPLACE(in_identifier, '`', '``'), '`'); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`create_app_task` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`create_app_task`%% CREATE PROCEDURE `mysql_tasks`.`create_app_task`( IN `app_user_id` VARCHAR(255), IN `name` VARCHAR(255), IN `task_type` VARCHAR(45), IN `data` JSON, IN `data_json_schema` JSON, IN `log_data_json_schema` JSON, OUT `task_id` VARCHAR(36)) SQL SECURITY INVOKER COMMENT ' Creates an application task and returns its UUID. Parameters: - app_user_id: application user id to filter the list on - name: name for the task - task_type: type for the task - data: JSON field holding additional task data - data_json_schema: JSON schema for the data field - log_data_json_schema: JSON schema for task log\'s data filed - OUT task_id: UUID of the created task' BEGIN SET task_id = UUID(); CALL `mysql_tasks`.`create_app_task_with_id`( `app_user_id`, `task_id`, `name`, `task_type`, `data`, `data_json_schema`, `log_data_json_schema` ); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`create_app_task_with_id` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`create_app_task_with_id`%% CREATE PROCEDURE `mysql_tasks`.`create_app_task_with_id`( IN `app_user_id` VARCHAR(255), IN `id` VARCHAR(36), IN `name` VARCHAR(255), IN `task_type` VARCHAR(45), IN `data` JSON, IN `data_json_schema` JSON, IN `log_data_json_schema` JSON) SQL SECURITY INVOKER COMMENT ' Creates an application task given its UUID. Parameters: - app_user_id: application user id to filter the list on - id: task UUID - name: name for the task - task_type: type for the task - data: JSON field holding additional task data - data_json_schema: JSON schema for the data field - log_data_json_schema: JSON schema for task log\'s data filed' BEGIN -- insert entry into task table INSERT INTO `mysql_tasks`.`task_i`(`id`, `app_user_id`, `server_uuid`, `name`, `connection_id`, `task_type`, `data`, `data_json_schema`, `log_data_json_schema`) VALUES (UUID_TO_BIN(id, 1), app_user_id, UUID_TO_BIN(@@server_uuid, 1), `name`, CONNECTION_ID(), task_type, `data`, `data_json_schema`, `log_data_json_schema`); IF `data_json_schema` IS NOT NULL AND NOT JSON_SCHEMA_VALID(`data_json_schema`, `data`) THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'The provided task data does not conform to the given' ' data_json_schema.', MYSQL_ERRNO = 5400; END IF; CALL `mysql_tasks`.`add_task_log`( id, 'Task created by user.', NULL, 0, 'SCHEDULED' ); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`create_task` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`create_task`%% CREATE PROCEDURE `mysql_tasks`.`create_task`( IN `name` VARCHAR(255), IN `task_type` VARCHAR(45), IN `data` JSON, OUT `task_id` VARCHAR(36) ) SQL SECURITY INVOKER COMMENT ' Creates a task and returns its UUID. Parameters: - name: name for the task - task_type: type for the task - data: JSON field holding additional task data - OUT task_id: UUID of the created task' BEGIN CALL `mysql_tasks`.`create_app_task`( NULL, `name`, task_type, `data`, NULL, NULL, `task_id`); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`create_task_with_id` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`create_task_with_id`%% CREATE PROCEDURE `mysql_tasks`.`create_task_with_id`( IN `id` VARCHAR(36), IN `name` VARCHAR(255), IN `task_type` VARCHAR(45), IN `data` JSON ) SQL SECURITY INVOKER COMMENT ' Creates a task given its UUID. Parameters: - id: task UUID - name: name for the task - task_type: type for the task - data: JSON field holding additional task data' BEGIN CALL `mysql_tasks`.`create_app_task_with_id`( NULL, `id`, `name`, `task_type`, `data`, NULL, NULL); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`add_task_log` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`add_task_log`%% CREATE PROCEDURE `mysql_tasks`.`add_task_log`( IN `task_id` VARCHAR(36), IN `message` VARCHAR(2000), IN `data` JSON, IN `progress` SMALLINT, IN `status` ENUM('SCHEDULED', 'RUNNING', 'COMPLETED', 'ERROR', 'CANCELLED') ) SQL SECURITY INVOKER COMMENT ' Persists a task log entry. Parameters: - task_id: UUID of the task owning the log entry - message: a log message - data: JSON field holding additional log data - progress: task execution progress (0 - 100) - status: enumeration for the task status' BEGIN DECLARE log_id BINARY(16) DEFAULT UUID_TO_BIN(UUID(), 1); DECLARE log_data_json_schema JSON DEFAULT NULL; SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_task_log_data_json_schema`(task_id) INTO log_data_json_schema; IF `log_data_json_schema` IS NOT NULL AND NOT JSON_SCHEMA_VALID(`log_data_json_schema`, `data`) THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'The provided log data does not conform to the' ' given log_data_json_schema.', MYSQL_ERRNO = 5400; END IF; INSERT INTO `mysql_tasks`.`task_log_i` ( `id`, `task_id`, `log_time`, `message`, `data`, `progress`, `status`) VALUES ( log_id, UUID_TO_BIN(task_id, 1), NOW(6), message, `data`, progress, `status` ); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`kill_app_task` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`kill_app_task`%% CREATE PROCEDURE `mysql_tasks`.`kill_app_task`( IN `app_user_id` VARCHAR(255), IN `id_or_alias` VARCHAR(36) ) SQL SECURITY INVOKER COMMENT ' Kills an application task. Parameters: - app_user_id: application user id to filter the list on - id_or_alias: UUID or a unique alias of the task to kill.' BEGIN DECLARE task_id VARCHAR(36); DECLARE task_status JSON DEFAULT NULL; DECLARE status TEXT DEFAULT NULL; DECLARE suuid VARCHAR(36) DEFAULT NULL; DECLARE cid BIGINT UNSIGNED DEFAULT NULL; DECLARE i INT DEFAULT 0; DECLARE event_name TEXT DEFAULT NULL; SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`get_app_task_id`(app_user_id, id_or_alias) INTO task_id; SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`app_task_status`(app_user_id, task_id) INTO task_status; IF task_status IS NULL THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Task does not exist or not allowed to access it.'; END IF; SET status = JSON_UNQUOTE(JSON_EXTRACT(task_status, '$.status')); SET suuid = JSON_UNQUOTE(JSON_EXTRACT(task_status, '$.server_uuid')); SET cid = JSON_UNQUOTE(JSON_EXTRACT(task_status, '$.connection_id')); IF suuid <> @@server_uuid THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Tasks started on other servers cannot be killed.'; END IF; IF (status <> 'RUNNING' AND status <> 'SCHEDULED') THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Task inactive.'; END IF; -- kill process SET @stmt = CONCAT('KILL ', cid); PREPARE stmt FROM @stmt; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- kill task monitor IF JSON_LENGTH(task_status->'$.task_data.mysqlMetadata.events') > 1 THEN CALL `mysql_tasks`.`stop_task_monitor`( task_status->'$.task_data.mysqlMetadata.events[last]', task_id); END IF; -- drop associated events IF JSON_CONTAINS_PATH( task_status, 'one', '$.task_data.mysqlMetadata.events') THEN WHILE i < JSON_LENGTH(task_status->'$.task_data.mysqlMetadata.events') DO SET event_name = JSON_UNQUOTE( JSON_EXTRACT( task_status->'$.task_data.mysqlMetadata.events', CONCAT('$[',i,']') )); CALL `mysql_tasks`.`drop_event`(event_name); SELECT i+1 INTO i; END WHILE; END IF; CALL `mysql_tasks`.`add_task_log`( task_id, 'Cancelled by user.', NULL, 100, 'CANCELLED'); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`kill_task` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`kill_task`%% CREATE PROCEDURE `mysql_tasks`.`kill_task`(IN `id_or_alias` VARCHAR(36)) SQL SECURITY INVOKER COMMENT ' Kills a task. Parameters: - app_user_id: application user id to filter the list on - id_or_alias: UUID or a unique alias of the task to kill.' BEGIN CALL `mysql_tasks`.`kill_app_task`(NULL, id_or_alias); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`drop_event` # for internal use # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`drop_event`%% CREATE PROCEDURE `mysql_tasks`.`drop_event`(IN `event_name` TEXT) SQL SECURITY INVOKER COMMENT ' Drops an event owned by a task. Parameters: - event_name: the name of the event to be dropped.' BEGIN DECLARE event_cmnt TEXT DEFAULT NULL; DECLARE version_string VARCHAR(255); DECLARE major INT UNSIGNED; DECLARE minor INT UNSIGNED; DECLARE patch INT UNSIGNED; DECLARE task_mgmt_major INT UNSIGNED; DECLARE task_mgmt_minor INT UNSIGNED; DECLARE task_mgmt_patch INT UNSIGNED; -- get schema version SELECT /*+ SET_VAR(use_secondary_engine=off) */ v.major, v.minor, v.patch INTO task_mgmt_major, task_mgmt_minor, task_mgmt_patch FROM `mysql_tasks`.`msm_schema_version` v; -- get comment from the runner event SELECT /*+ SET_VAR(use_secondary_engine=off) */ EVENT_COMMENT INTO event_cmnt FROM information_schema.events e WHERE CONCAT(mysql_tasks.quote_identifier(e.EVENT_SCHEMA), '.', mysql_tasks.quote_identifier(e.EVENT_NAME)) = `event_name` AND e.EVENT_COMMENT LIKE 'mysql_tasks_schema_version=%'; SET version_string = SUBSTRING_INDEX( event_cmnt, 'mysql_tasks_schema_version=', -1); -- Get the part after "=" SET major = CAST(SUBSTRING_INDEX(version_string, '.', 1) AS UNSIGNED); SET minor = CAST( SUBSTRING_INDEX( SUBSTRING_INDEX(version_string, '.', 2), '.', -1 ) AS UNSIGNED ); SET patch = CAST(SUBSTRING_INDEX(version_string, '.', -1) AS UNSIGNED); -- drop event if it has the comment with the supported version IF (event_cmnt IS NOT NULL) AND (major < task_mgmt_major OR (major = task_mgmt_major AND (minor < task_mgmt_minor OR (minor = task_mgmt_minor AND patch <= task_mgmt_patch)))) THEN SET @stmt = CONCAT('DROP EVENT IF EXISTS ', event_name); PREPARE stmt FROM @stmt; EXECUTE stmt; DEALLOCATE PREPARE stmt; END IF; END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`execute_prepared_stmt_from_app_async` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`execute_prepared_stmt_from_app_async`%% CREATE PROCEDURE `mysql_tasks`.`execute_prepared_stmt_from_app_async`( IN `sql_statements` TEXT, IN `app_user_id` VARCHAR(255), IN `schema_name` VARCHAR(255), IN `task_type` VARCHAR(80), IN `task_name` VARCHAR(255), IN `task_data` JSON, IN `data_json_schema` JSON, IN `log_data_json_schema` JSON, IN `progress_monitor_sql_statements` TEXT, IN `progress_monitor_refresh_period` DECIMAL(5,2), OUT task_id VARCHAR(36)) SQL SECURITY INVOKER COMMENT ' Executes a prepared statement asynchronously from an application. Parameters: - sql_statements: one or multiple SQL statements (separated by ;) that are to be executed in a detached thread - app_user_id: application user id to filter the list on - schema_name: name of the schema that hosts MySQL Events running in dedicated threads - task_type: type of the task - task_name: name of the task - task_data: additional data/metadata of a task - data_json_schema: JSON schema of the task_data field - log_data_json_schema: JSON schema for the data field for the task logs - progress_monitor_sql_statements: one or multiple SQL statements (separated by ;) that are executed periodically and that can update the progress of the background task - progress_monitor_refresh_period: interval at which progress monitor sql statements are executed (in seconds) - OUT task_id: UUID of the task that runs the prepared statement' BEGIN DECLARE event_name, progress_event_name TEXT DEFAULT NULL; DECLARE task_mgmt_version TEXT DEFAULT NULL; DECLARE max_parallel_tasks INT UNSIGNED DEFAULT NULL; DECLARE active_task_cnt INT UNSIGNED DEFAULT NULL; DECLARE internal_data JSON DEFAULT NULL; DECLARE internal_data_json_schema JSON DEFAULT NULL; DECLARE data_json_schema_required JSON DEFAULT NULL; DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN GET DIAGNOSTICS CONDITION 1 @p1 = RETURNED_SQLSTATE, @p2 = MYSQL_ERRNO, @p3 = MESSAGE_TEXT; DO RELEASE_LOCK('execute_prepared_stmt_async'); IF @p3 = 'No schema set.' THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No schema set. Please pass in the schema name' ' or set the current schema with USE.', MYSQL_ERRNO = 5400; ELSE RESIGNAL; END IF; END; SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_EXTRACT(data, '$.limits.maximumPreparedStmtAsyncTasks') INTO max_parallel_tasks FROM `mysql_tasks`.`config` WHERE id = 1 LIMIT 1; IF progress_monitor_refresh_period IS NULL THEN SET progress_monitor_refresh_period = 5; ELSEIF progress_monitor_refresh_period <= 0 THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'progress_monitor_refresh_period must be a ' 'positive number', MYSQL_ERRNO = 5400; END IF; IF task_type IS NULL THEN SET task_type = 'Async_SQL'; END IF; IF task_name IS NULL THEN SET task_name = 'execute_prepared_stmt_async'; END IF; IF schema_name IS NULL THEN SELECT /*+ SET_VAR(use_secondary_engine=off) */ current_schema INTO schema_name FROM `performance_schema`.`events_statements_current` WHERE thread_id=PS_CURRENT_THREAD_ID() AND nesting_event_level=0; END IF; IF schema_name IS NULL THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No schema set.', MYSQL_ERRNO = 5400; END IF; -- ensure the SQL statements end with a semicolon IF NOT REGEXP_LIKE(sql_statements, ';[:space:]*$') THEN SET sql_statements = CONCAT(sql_statements, '; '); END IF; -- make sure the reserved property is not used IF JSON_CONTAINS_PATH( data_json_schema, 'one', '$.properties.mysqlMetadata') THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'data_json_schema must not' ' contain a reserved property "mysqlMetadata".', MYSQL_ERRNO = 5400; END IF; SET internal_data_json_schema = JSON_OBJECT( "type", "object", "properties", JSON_OBJECT( "mysqlMetadata", JSON_OBJECT( "type", "object", "properties", JSON_OBJECT( "events", JSON_OBJECT( "type", "array", "items", JSON_OBJECT( "type", "string" ), "minItems", 1, "uniqueItems", true ), "autoGc", JSON_OBJECT( "type", "boolean" ) ), "required", JSON_ARRAY("events", "autoGc") ) ), "required", JSON_ARRAY("mysqlMetadata") ); -- merge the provided schema and the internal schema SET data_json_schema = COALESCE( data_json_schema, JSON_OBJECT("required", JSON_ARRAY())); SELECT JSON_UNQUOTE(JSON_MERGE_PRESERVE( JSON_EXTRACT(internal_data_json_schema, '$.required'), JSON_EXTRACT(data_json_schema, '$.required') )) INTO data_json_schema_required; SET data_json_schema = JSON_MERGE_PATCH( internal_data_json_schema, data_json_schema); SET data_json_schema = JSON_SET( data_json_schema, '$.required', data_json_schema_required); SELECT /*+ SET_VAR(use_secondary_engine=off) */ CONCAT( mysql_tasks.quote_identifier(schema_name), '.', mysql_tasks.quote_identifier(UUID()) ) INTO event_name; SELECT /*+ SET_VAR(use_secondary_engine=off) */ CONCAT( mysql_tasks.quote_identifier(schema_name), '.', mysql_tasks.quote_identifier(UUID()) ) INTO progress_event_name; SELECT /*+ SET_VAR(use_secondary_engine=off) */ CONCAT(major, '.', minor, '.', patch) FROM `mysql_tasks`.`msm_schema_version` INTO task_mgmt_version; SET internal_data = JSON_OBJECT( "mysqlMetadata", JSON_OBJECT( "events", IF (progress_monitor_sql_statements IS NULL, JSON_ARRAY(event_name), JSON_ARRAY(event_name, progress_event_name) ), "autoGc", true ) ); -- make sure reserved property is not used IF JSON_CONTAINS_PATH(task_data, 'one', '$.mysqlMetadata') THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'task_data must not contain a' ' reserved property "mysqlMetadata".', MYSQL_ERRNO = 5400; END IF; -- merge the provided task data and the internal metadata SET task_data = COALESCE(task_data, JSON_OBJECT()); SET task_data = JSON_MERGE_PATCH(internal_data, task_data); SELECT GET_LOCK('execute_prepared_stmt_async', 2) INTO @__lock; IF @__lock = 0 THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Cannot acquire lock.' ' Try again later', MYSQL_ERRNO = 5400; END IF; -- READ COMMITTED does not acquire lock on the task_log table, -- avoiding a potential deadlock between this SELECT and -- INSERTS in concurrent events SET TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT /*+ SET_VAR(use_secondary_engine=off) */ `mysql_tasks`.`active_task_count`(NULL) INTO active_task_cnt; COMMIT ; IF active_task_cnt >= max_parallel_tasks THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Maximum number of parallel' ' tasks reached, try again later.', MYSQL_ERRNO = 5400; END IF; SET task_id = UUID(); SET @eventSql = CONCAT( 'CREATE EVENT ', event_name, ' ', 'ON SCHEDULE AT NOW() ON COMPLETION NOT PRESERVE ENABLE ', 'COMMENT "mysql_tasks_schema_version=', task_mgmt_version, '" ', 'DO BEGIN ', 'DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN ', ' GET DIAGNOSTICS CONDITION 1', ' @p1 = RETURNED_SQLSTATE,', ' @p2 = MYSQL_ERRNO,', ' @p3 = MESSAGE_TEXT; ', ' CALL `mysql_tasks`.`stop_task_monitor`(', QUOTE(progress_event_name), ', ', QUOTE(task_id), '); ', ' CALL `mysql_tasks`.`add_task_log`("', task_id, '", CONCAT("Error: ", @p3), NULL, 100, "ERROR"); ', 'END; ', 'SET ROLE ALL; ', 'SET @task_id ="', task_id, '"; SET @task_result = NULL; ', 'CALL `mysql_tasks`.`create_app_task_with_id`(', IF(app_user_id IS NULL, 'NULL', QUOTE(app_user_id)), ', @task_id, ', QUOTE(task_name), ', ', QUOTE(task_type), ', ', QUOTE(task_data), ', ', QUOTE(data_json_schema), ', ', QUOTE(log_data_json_schema), '); ', 'CALL `mysql_tasks`.`start_task_monitor`(', QUOTE(progress_event_name), ', ', QUOTE(task_id), ', ', QUOTE(progress_monitor_sql_statements), ', ', progress_monitor_refresh_period, '); ' 'CALL `mysql_tasks`.`add_task_log`(', '@task_id, "Event execution started...", NULL, 0, "RUNNING"); ', sql_statements, 'CALL `mysql_tasks`.`stop_task_monitor`(', QUOTE(progress_event_name), ', ', QUOTE(task_id), '); ', 'CALL `mysql_tasks`.`add_task_log`(', '@task_id, "Execution finished.", ', 'CAST(@task_result AS JSON), 100, "COMPLETED"); ', 'SET @task_id = NULL; SET @task_result = NULL; ', 'END;'); PREPARE dynamic_statement FROM @eventSql; EXECUTE dynamic_statement; DEALLOCATE PREPARE dynamic_statement; SELECT RELEASE_LOCK('execute_prepared_stmt_async') INTO @__lock; SET @__lock = NULL; END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`execute_prepared_stmt_async` # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`execute_prepared_stmt_async`%% CREATE PROCEDURE `mysql_tasks`.`execute_prepared_stmt_async`( IN `sql_statements` TEXT, IN `schema_name` VARCHAR(255), IN `task_name` VARCHAR(255), IN `task_data` JSON, OUT task_id VARCHAR(36)) SQL SECURITY INVOKER COMMENT ' Executes a prepared statement asynchronously. Parameters: - sql_statements: one or multiple SQL statements (separated by ;) that are to be executed in a detached thread - schema_name: name of the schema that hosts MySQL Events running in dedicated threads - task_name: name of the task - task_data: additional data/metadata of a task - OUT task_id: UUID of the task that runs the prepared statement' BEGIN CALL `mysql_tasks`.`execute_prepared_stmt_from_app_async`( `sql_statements`, NULL, `schema_name`, NULL, `task_name`, `task_data`, NULL, NULL, NULL, NULL, task_id); END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`start_task_monitor` # for internal use # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`start_task_monitor`%% CREATE PROCEDURE `mysql_tasks`.`start_task_monitor`( IN `event_name` TEXT, IN `task_id` VARCHAR(36), IN `sql_statements` TEXT, IN `refresh_period` DECIMAL(5,2) ) SQL SECURITY INVOKER COMMENT ' Starts a task monitor SQL Event. It periodically updates the task progress. Parameters: - event_name: fully qualified name of the SQL Event that runs the monitoring SQL statements - task_id: UUID of the task being monitored - sql_statements: the monitoring SQL statements (separated by ;) - refresh_period: interval at which the monitoring sql statements are executed (in seconds)' BEGIN DECLARE task_mgmt_version TEXT DEFAULT NULL; IF sql_statements IS NOT NULL AND event_name IS NOT NULL THEN IF refresh_period IS NULL THEN SET refresh_period = 5; ELSEIF refresh_period <= 0 THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'refresh_period must be a positive number', MYSQL_ERRNO = 5400; END IF; SELECT /*+ SET_VAR(use_secondary_engine=off) */ CONCAT(major, '.', minor, '.', patch) FROM `mysql_tasks`.`msm_schema_version` INTO task_mgmt_version; -- ensure the SQL statements end with a semicolon IF NOT REGEXP_LIKE(sql_statements, ';[:space:]*$') THEN SET sql_statements = CONCAT(sql_statements, '; '); END IF; SET @eventSql = CONCAT( 'CREATE EVENT ', event_name, ' ', 'ON SCHEDULE AT NOW() ON COMPLETION NOT PRESERVE ENABLE ', 'COMMENT "mysql_tasks_schema_version=', task_mgmt_version, '" ', 'DO BEGIN ', 'DECLARE do_run BOOLEAN DEFAULT TRUE; ', 'DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN ', ' GET DIAGNOSTICS CONDITION 1 ', ' @p1 = RETURNED_SQLSTATE, @p2 = MYSQL_ERRNO, @p3 = MESSAGE_TEXT; ', ' CALL `mysql_tasks`.`add_task_log`(', QUOTE(task_id),', CONCAT("Error: ", @p3), NULL, 100, "ERROR"); ', 'END; ', 'SET ROLE ALL; ', 'SET @task_id =', QUOTE(task_id), '; ', -- synchronization with thread calling end_task_monitor -- if the event was dropped (not found in information_schema.events) -- terminate it 'IF (GET_LOCK(', QUOTE(event_name),', 60) = 1 AND ', '(SELECT COUNT(*)>0 FROM information_schema.events ise ', ' WHERE CONCAT(mysql_tasks.quote_identifier(ise.event_schema), ', ' ".", mysql_tasks.quote_identifier(ise.event_name)) = ', QUOTE(event_name), ')) THEN ', ' CALL `mysql_tasks`.`add_task_log`(@task_id, ', ' "Progress monitor started.", ', ' JSON_OBJECT("connection_id", CONNECTION_ID()), 0, "RUNNING"); ', 'END IF; ', 'WHILE IS_USED_LOCK(', QUOTE(event_name), ') <=> CONNECTION_ID() DO ', ' DO RELEASE_LOCK(', QUOTE(event_name), '); ', 'END WHILE; ', -- if at any time event gets killed, terminate its while loop 'WHILE (SELECT COUNT(*)>0 FROM information_schema.events ise ', ' WHERE CONCAT(mysql_tasks.quote_identifier(ise.event_schema), ', ' ".", mysql_tasks.quote_identifier(ise.event_name)) = ', QUOTE(event_name), ') DO ', sql_statements, ' DO SLEEP(', refresh_period, '); ', 'END WHILE; ', 'SET @task_id = NULL; ', 'END' ); PREPARE dynamic_statement FROM @eventSql; EXECUTE dynamic_statement; DEALLOCATE PREPARE dynamic_statement; END IF; END%% # ----------------------------------------------------- # Procedure `mysql_tasks`.`stop_task_monitor` # for internal use # ----------------------------------------------------- DROP PROCEDURE IF EXISTS `mysql_tasks`.`stop_task_monitor`%% CREATE PROCEDURE `mysql_tasks`.`stop_task_monitor`( IN `event_name` TEXT, IN `task_id` VARCHAR(36) ) SQL SECURITY INVOKER COMMENT ' Stops a task monitor SQL Event. Parameters: - event_name: fully qualified name of the SQL Event that runs the monitoring SQL statements - task_id: ID of the task for which the task monitor has been started' BEGIN DECLARE log_msg JSON DEFAULT NULL; DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN DO RELEASE_LOCK(QUOTE(event_name)); END; IF event_name IS NOT NULL THEN -- synchronization with thread running task_monitor DO GET_LOCK(event_name, 60); SELECT `mysql_tasks`.`find_task_log_msg`( task_id, 'Progress monitor started.') INTO log_msg; IF (log_msg IS NOT NULL) THEN -- due to a race condition, we might not enter here -- but the task monitor event may be started SET @killSql = CONCAT( 'KILL ', log_msg->>'$.data.connection_id' ); PREPARE dynamic_statement FROM @killSql; EXECUTE dynamic_statement; DEALLOCATE PREPARE dynamic_statement; END IF; -- dropping the event will not kill its active process SET @eventSql = CONCAT( 'DROP EVENT IF EXISTS ', event_name ); PREPARE dynamic_statement FROM @eventSql; EXECUTE dynamic_statement; DEALLOCATE PREPARE dynamic_statement; WHILE IS_USED_LOCK(event_name) <=> CONNECTION_ID() DO DO RELEASE_LOCK(event_name); END WHILE; END IF; END%% # ----------------------------------------------------- # EVENT `mysql_tasks`.`task_cleanup` # Periodically cleans up data from old tasks. # Old tasks must have finished (COMPLETED, CANCELED) # at least 6 days ago. # ----------------------------------------------------- DROP EVENT IF EXISTS `mysql_tasks`.`task_cleanup`%% CREATE EVENT `mysql_tasks`.`task_cleanup` ON SCHEDULE EVERY 1 DAY ON COMPLETION NOT PRESERVE ENABLE COMMENT 'Clean up old tasks' DO BEGIN DECLARE task_ids_to_del JSON DEFAULT NULL; -- find all tasks with last tog time -- 6 days or older from the current date SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_ARRAYAGG( BIN_TO_UUID(last_tl.task_id, 1) ) INTO @task_ids_to_del FROM ( SELECT task_id, MAX(log_time) AS last_log_time FROM `mysql_tasks`.`task_log_impl` GROUP BY task_id ) tl LEFT OUTER JOIN `mysql_tasks`.`task_log_impl` last_tl ON tl.task_id = last_tl.task_id AND tl.last_log_time = last_tl.log_time WHERE (last_tl.status <> 'SCHEDULED' AND last_tl.status <> 'RUNNING') AND last_tl.log_time <= DATE_SUB(CURDATE(), INTERVAL 6 DAY); -- delete all old tasks DELETE FROM `mysql_tasks`.`task_impl` WHERE BIN_TO_UUID(id, 1) MEMBER OF(@task_ids_to_del); END%% # ----------------------------------------------------- # EVENT `mysql_tasks`.`task_gc` # Periodic garbage collector that cleans up tasks # which are in the active state but their # process does not exist. # ----------------------------------------------------- DROP EVENT IF EXISTS `mysql_tasks`.`task_gc`%% CREATE EVENT `mysql_tasks`.`task_gc` ON SCHEDULE EVERY 10 SECOND ON COMPLETION NOT PRESERVE ENABLE COMMENT 'Garbage collector' DO BEGIN DECLARE i, j INT DEFAULT 0; DECLARE json_id JSON DEFAULT NULL; DECLARE json_user JSON DEFAULT NULL; DECLARE json_data JSON DEFAULT NULL; DECLARE curr_id VARCHAR(36) DEFAULT NULL; DECLARE curr_user TEXT DEFAULT NULL; DECLARE curr_data JSON DEFAULT NULL; DECLARE event_name TEXT DEFAULT NULL; -- Find active tasks without alive process SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_ARRAYAGG( BIN_TO_UUID(t.id, 1)), JSON_ARRAYAGG(t.mysql_user), JSON_ARRAYAGG(t.data) INTO json_id, json_user, json_data FROM `mysql_tasks`.`task_log_impl` tl JOIN ( SELECT task_id, MAX(log_time) AS max_log_time, MAX(id) AS max_id FROM `mysql_tasks`.`task_log_impl` GROUP BY task_id ) tl2 ON tl.log_time = tl2.max_log_time JOIN `mysql_tasks`.`task_impl` t ON tl.task_id = t.id LEFT JOIN `performance_schema`.`processlist` p ON t.connection_id = p.id WHERE (tl.status = 'RUNNING' OR tl.status = 'SCHEDULED') AND p.id IS NULL AND t.server_uuid = UUID_TO_BIN(@@server_uuid, 1) AND t.data->'$.mysqlMetadata.autoGc' = true; WHILE i < JSON_LENGTH(json_id) DO SET curr_id = JSON_UNQUOTE(JSON_EXTRACT(json_id, CONCAT('$[', i, ']'))); SET curr_user = JSON_UNQUOTE(JSON_EXTRACT(json_user, CONCAT('$[', i, ']'))); SET curr_data = JSON_EXTRACT(json_data, CONCAT('$[', i, ']')); IF curr_data IS NOT NULL AND JSON_CONTAINS_PATH(curr_data, 'one', '$.mysqlMetadata.events') THEN SET j = 0; WHILE j < JSON_LENGTH( JSON_EXTRACT(curr_data, '$.mysqlMetadata.events')) DO SET event_name = JSON_UNQUOTE( JSON_EXTRACT(curr_data, CONCAT('$.mysqlMetadata.events[', j, ']'))); CALL `mysql_tasks`.`drop_event`(event_name); SET j = j + 1; END WHILE; END IF; IF curr_id IS NOT NULL THEN INSERT INTO `mysql_tasks`.`task_log_impl`( `id`, `mysql_user`, `task_id`, `log_time`, `message`, `data`, `progress`, `status`) VALUES ( UUID_TO_BIN(UUID(), 1), curr_user, UUID_TO_BIN(curr_id, 1), NOW(6), 'Cleaned up by system.', NULL, 100, 'CANCELLED'); END IF; SELECT i + 1 INTO i; END WHILE; -- Find dangling events from tasks which are no longer running SELECT /*+ SET_VAR(use_secondary_engine=off) */ JSON_ARRAYAGG(BIN_TO_UUID(t.id, 1)), JSON_ARRAYAGG(t.data) INTO json_id, json_data FROM `mysql_tasks`.`task_log_impl` tl JOIN ( SELECT task_id, MAX(log_time) AS max_log_time, MAX(id) AS max_id FROM `mysql_tasks`.`task_log_impl` GROUP BY task_id ) tl2 ON tl.log_time = tl2.max_log_time JOIN `mysql_tasks`.`task_impl` t ON tl.task_id = t.id JOIN information_schema.events ise ON JSON_UNQUOTE( t.data->'$.mysqlMetadata.events[0]') = CONCAT(mysql_tasks.quote_identifier( ise.EVENT_SCHEMA), '.', mysql_tasks.quote_identifier(ise.EVENT_NAME)) LEFT JOIN `performance_schema`.`processlist` p ON t.connection_id = p.id WHERE (tl.status <> 'RUNNING' AND tl.status <> 'SCHEDULED') AND p.id IS NULL AND t.server_uuid = UUID_TO_BIN(@@server_uuid, 1) AND t.data->'$.mysqlMetadata.autoGc' = true; WHILE i < JSON_LENGTH(json_id) DO SET curr_data = JSON_EXTRACT(json_data, CONCAT('$[', i, ']')); IF curr_data IS NOT NULL AND JSON_CONTAINS_PATH(curr_data, 'one', '$.mysqlMetadata.events') THEN SET j = 0; WHILE j < JSON_LENGTH( JSON_EXTRACT(curr_data, '$.mysqlMetadata.events')) DO SET event_name = JSON_UNQUOTE( JSON_EXTRACT(curr_data, CONCAT('$.mysqlMetadata.events[', j, ']'))); CALL `mysql_tasks`.`drop_event`(event_name); SET j = j + 1; END WHILE; END IF; SELECT i + 1 INTO i; END WHILE; END%% DELIMITER ;%% # ############################################################################# # MSM Section 170: Authorization # ----------------------------------------------------------------------------- # This section is used to define the ROLEs and GRANT statements. # ----------------------------------------------------------------------------- # The mysql_task_admin ROLE allows to fully manage the MySQL tasks schema # The mysql_task_user ROLE allows to create and work with MySQL tasks CREATE ROLE IF NOT EXISTS 'mysql_task_admin', 'mysql_task_user'; # GRANTS for mysql_task_admin GRANT ALL ON `mysql_tasks`.* TO 'mysql_task_admin'; GRANT SELECT ON `performance_schema`.`events_statements_current` TO 'mysql_task_admin'; # GRANTS for mysql_task_user GRANT SELECT ON `mysql_tasks`.`config` TO 'mysql_task_user'; GRANT SELECT ON `mysql_tasks`.`msm_schema_version` TO 'mysql_task_user'; GRANT INSERT ON `mysql_tasks`.`task_i` TO 'mysql_task_user'; GRANT INSERT ON `mysql_tasks`.`task_log_i` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_list` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_logs` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_status_list` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_status` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`task_status_brief` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`active_task_count` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`active_user_task_count` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`find_task_log_msg` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_ids_from_alias` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_id` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_alias` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_log_data_json_schema` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_task_connection_id` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`create_task` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`create_task_with_id` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`add_task_log` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`kill_task` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`drop_event` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_list` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_logs` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_status_list` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_status` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`app_task_status_brief` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_app_task_ids_from_alias` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_app_task_id` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`get_app_task_alias` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`create_app_task` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`create_app_task_with_id` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`kill_app_task` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`execute_prepared_stmt_from_app_async` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`execute_prepared_stmt_async` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`start_task_monitor` TO 'mysql_task_user'; GRANT EXECUTE ON PROCEDURE `mysql_tasks`.`stop_task_monitor` TO 'mysql_task_user'; GRANT EXECUTE ON FUNCTION `mysql_tasks`.`quote_identifier` TO 'mysql_task_user'; GRANT SELECT ON `performance_schema`.`events_statements_current` TO 'mysql_task_user'; # ############################################################################# # MSM Section 190: Removal of Helpers # ----------------------------------------------------------------------------- # Removal of optional helper PROCEDUREs and FUNCTIONs that are called during # the creation of the database schema. Note that DROP IF EXISTS needs to be # used. # ----------------------------------------------------------------------------- # Drop optional helper PROCEDUREs and FUNCTIONs here # ############################################################################# # MSM Section 910: Database Schema Version # ----------------------------------------------------------------------------- # Setting the correct database schema version. # ----------------------------------------------------------------------------- CREATE OR REPLACE SQL SECURITY INVOKER VIEW `mysql_tasks`.`msm_schema_version` ( `major`,`minor`,`patch`) AS SELECT 3, 0, 0; # ############################################################################# # MSM Section 920: Server Variable Restoration # ----------------------------------------------------------------------------- # Restore the modified server variables to their original state. # ----------------------------------------------------------------------------- SET SQL_MODE=@OLD_SQL_MODE; SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS; SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS;