reimplement work_queue_s

Reimplement work_queue_s as an abstraction layer for QueueHandle_t.
This commit is contained in:
tgotic 2022-12-02 11:39:40 +01:00
parent bb2cbeea94
commit fdfa6d9a67

View File

@ -30,11 +30,15 @@ typedef struct work_item_s {
void *context;
} work_item_t;
typedef struct work_queue_s {
QueueHandle_t queue;
} work_queue_t;
struct osi_thread_s {
TaskHandle_t task; /*!< Store the task object */
atomic_uintptr_t signal; /*!< task thread signal */
uint8_t work_queue_num; /*!< Number of work queues */
QueueHandle_t work_queues[]; /*!< variable length queue array */
work_queue_t work_queues[]; /*!< variable length queue array */
};
struct osi_event {
@ -47,14 +51,21 @@ struct osi_event {
static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100;
#define DEFAULT_JOIN_TIMEOUT_MS 1000U
static QueueHandle_t osi_work_queue_create(size_t capacity)
static bool osi_work_queue_create(work_queue_t* work_queue, size_t capacity)
{
return xQueueCreate(capacity, sizeof(work_item_t));
work_queue->queue = xQueueCreate(capacity, sizeof(work_item_t));
if (work_queue->queue) {
return true;
}
return false;
}
static void osi_work_queue_delete(QueueHandle_t queue)
static void osi_work_queue_delete(work_queue_t* work_queue)
{
vQueueDelete(queue);
if (work_queue->queue) {
vQueueDelete(work_queue->queue);
work_queue->queue = NULL;
}
}
static uint32_t osi_thread_task_take(uint32_t timeout_ms)
@ -69,12 +80,12 @@ static void osi_thread_task_give(TaskHandle_t handle)
}
}
static bool osi_thead_work_queue_get(QueueHandle_t queue, work_item_t *item)
static bool osi_thead_work_queue_get(work_queue_t* work_queue, work_item_t *item)
{
assert (queue != NULL);
assert (work_queue->queue != NULL);
assert (item != NULL);
if (xQueueReceive(queue, item, 0) == pdTRUE) {
if (xQueueReceive(work_queue->queue, item, 0) == pdTRUE) {
OSI_TRACE_VERBOSE("queue item received");
return true;
}
@ -82,22 +93,22 @@ static bool osi_thead_work_queue_get(QueueHandle_t queue, work_item_t *item)
return false;
}
static bool osi_thead_work_queue_put(QueueHandle_t queue, const work_item_t *item, uint32_t timeout_ms)
static bool osi_thead_work_queue_put(work_queue_t* work_queue, const work_item_t *item, uint32_t timeout_ms)
{
assert (queue != NULL);
assert (work_queue->queue != NULL);
assert (item != NULL);
if (xQueueSend(queue, item, osi_ms_to_ticks(timeout_ms)) == pdTRUE) {
if (xQueueSend(work_queue->queue, item, osi_ms_to_ticks(timeout_ms)) == pdTRUE) {
return true;
}
return false;
}
static size_t osi_thead_work_queue_len(QueueHandle_t queue)
static size_t osi_thead_work_queue_len(work_queue_t* work_queue)
{
assert (queue != NULL);
assert (work_queue->queue != NULL);
return (size_t)uxQueueMessagesWaiting(queue);
return (size_t)uxQueueMessagesWaiting(work_queue->queue);
}
static void osi_thread_run(void *arg)
@ -118,7 +129,7 @@ static void osi_thread_run(void *arg)
idx = 0;
// we should exit when thread->signal is set
while ((atomic_load(&thread->signal) == 0) && idx < thread->work_queue_num) {
if (osi_thead_work_queue_get(thread->work_queues[idx], &item) == true) {
if (osi_thead_work_queue_get(&thread->work_queues[idx], &item) == true) {
item.func(item.context);
idx = 0;
} else {
@ -154,10 +165,7 @@ static void osi_thread_free_internal(osi_thread_t *thread)
}
OSI_TRACE_DEBUG("freeing thread %p", thread);
for (uint8_t n = 0; n < thread->work_queue_num; n++) {
if (thread->work_queues[n]) {
osi_work_queue_delete(thread->work_queues[n]);
thread->work_queues[n] = NULL;
}
osi_work_queue_delete(&thread->work_queues[n]);
}
thread->work_queue_num = 0;
osi_free(thread);
@ -188,7 +196,7 @@ osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priorit
return NULL;
}
osi_thread_t *thread = (osi_thread_t *)osi_calloc(sizeof(osi_thread_t) + work_queue_num * sizeof(QueueHandle_t));
osi_thread_t *thread = (osi_thread_t *)osi_calloc(sizeof(osi_thread_t) + work_queue_num * sizeof(work_queue_t));
if (thread == NULL) {
OSI_TRACE_ERROR("thread alloc");
return NULL;
@ -197,8 +205,7 @@ osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priorit
for (uint8_t *i = &thread->work_queue_num; *i < work_queue_num; ++*i) {
size_t queue_len = work_queue_len[*i] ? work_queue_len[*i] : DEFAULT_WORK_QUEUE_CAPACITY;
thread->work_queues[*i] = osi_work_queue_create(queue_len);
if (thread->work_queues[*i] == NULL) {
if (osi_work_queue_create(&thread->work_queues[*i], queue_len) == false) {
OSI_TRACE_ERROR("thread work queue[%u] create", *i);
goto _err;
}
@ -239,7 +246,7 @@ bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context
.context = context
};
if (osi_thead_work_queue_put(thread->work_queues[queue_idx], &item, timeout)) {
if (osi_thead_work_queue_put(&thread->work_queues[queue_idx], &item, timeout) == true) {
OSI_TRACE_VERBOSE("thread %p new item in work queue[%d]", thread, queue_idx);
osi_thread_task_give(thread->task);
return true;
@ -275,7 +282,7 @@ int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx)
return 0;
}
return (int)osi_thead_work_queue_len(thread->work_queues[wq_idx]);
return (int)osi_thead_work_queue_len(&thread->work_queues[wq_idx]);
}
osi_event_t *osi_event_create(osi_thread_func_t func, void *context)