common/util/util_worker.c (108 lines of code) (raw):

#include <zephyr.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #include "util_worker.h" #include "cmsis_os2.h" #define WORKER_STACK_SIZE 10000 #define WORKER_PRIORITY osPriorityBelowNormal #define MAX_WORK_COUNT 32 #define WARN_WORK_PROC_TIME_MS 1000 K_THREAD_STACK_DEFINE(worker_stack_area, WORKER_STACK_SIZE); static struct k_work_q worker_work_q; static struct k_mutex mutex_use_count; static uint8_t work_count; typedef struct { union { struct k_work normal_work; struct k_work_delayable delay_work; } work; void (*fn)(void *, uint32_t); void *ptr_arg; uint32_t ui32_arg; char name[MAX_WORK_NAME_LEN]; } work_info; static void work_handler(struct k_work *item) { work_info *work_job = CONTAINER_OF(item, work_info, work); uint64_t fn_start_time, fn_finish_time; if (work_job->fn == NULL) { printk("work_handler function is null\n"); } else { fn_start_time = k_uptime_get(); work_job->fn(work_job->ptr_arg, work_job->ui32_arg); fn_finish_time = k_uptime_get(); /* Processing time too long, print warning message */ if ((fn_finish_time - fn_start_time) > WARN_WORK_PROC_TIME_MS) { printk("WARN: work %s Processing time too long, %lld ms\n", work_job->name, (fn_finish_time - fn_start_time)); } } if (k_mutex_lock(&mutex_use_count, K_MSEC(1000))) { printk("work_handler mutex lock fail\n"); free(work_job); return; } work_count--; k_mutex_unlock(&mutex_use_count); free(work_job); } /* Get number of works in worker now. * * @retval number of works */ uint8_t get_work_count() { return work_count; } /* Attempt to add new work to worker. * * @param job pointer to the worker_job to be added * * @retval 1 if successfully queued. * @retval -1 if work queue is full. * @retval -2 if memory allocation fail. * @retval -3 if mutex lock fail. */ int add_work(worker_job *job) { if (work_count >= MAX_WORK_COUNT) { printk("add_work work queue full\n"); return -1; } int ret; work_info *new_job; new_job = malloc(sizeof(work_info)); if (new_job == NULL) { printk("add_work malloc fail\n"); return -2; } memset(new_job, 0, sizeof(work_info)); new_job->fn = job->fn; new_job->ptr_arg = job->ptr_arg; new_job->ui32_arg = job->ui32_arg; if (job->name != NULL) { snprintf(new_job->name, sizeof(new_job->name), "%s", job->name); } if (k_mutex_lock(&mutex_use_count, K_MSEC(1000))) { printk("add_work mutex lock fail\n"); free(new_job); return -3; } if (job->delay_ms == 0) { /* no need to be delayed */ k_work_init(&(new_job->work.normal_work), work_handler); ret = k_work_submit_to_queue(&worker_work_q, &(new_job->work.normal_work)); if (ret != 1) { /* queued fail */ printk("add_work add work to queue fail\n"); goto error; } } else { /* need to be delayed */ k_work_init_delayable(&(new_job->work.delay_work), work_handler); ret = k_work_schedule_for_queue(&worker_work_q, &(new_job->work.delay_work), K_MSEC(job->delay_ms)); if (ret != 1) { /* queued fail */ printk("add_work add work to queue fail\n"); goto error; } } work_count++; k_mutex_unlock(&mutex_use_count); return ret; error: free(new_job); k_mutex_unlock(&mutex_use_count); return ret; } /* Initialize worker * * Should call this function to initialize worker before use other APIs. * This function initialize a workqueue and mutex. */ void init_worker() { k_work_queue_start(&worker_work_q, worker_stack_area, K_THREAD_STACK_SIZEOF(worker_stack_area), WORKER_PRIORITY, NULL); k_thread_name_set(&worker_work_q.thread, "util_worker"); k_mutex_init(&mutex_use_count); }