feat (pthread): reader-writer locks implementation

* Added implementation based on cond. variables
* Added unit tests

Closes https://github.com/espressif/esp-idf/issues/7411
This commit is contained in:
Jakob Hasse 2021-08-27 13:00:10 +08:00
parent 7df16fdf8f
commit 32e3444701
7 changed files with 586 additions and 25 deletions

View File

@ -1,11 +1,17 @@
idf_component_register(SRCS "pthread.c"
"pthread_cond_var.c"
"pthread_local_storage.c"
set(sources "pthread.c"
"pthread_cond_var.c"
"pthread_local_storage.c"
"pthread_rwlock.c")
idf_component_register(SRCS ${sources}
INCLUDE_DIRS include)
idf_build_set_property(COMPILE_DEFINITIONS "-D_POSIX_READER_WRITER_LOCKS" APPEND)
set(extra_link_flags "-u pthread_include_pthread_impl")
list(APPEND extra_link_flags "-u pthread_include_pthread_cond_impl")
list(APPEND extra_link_flags "-u pthread_include_pthread_local_storage_impl")
list(APPEND extra_link_flags "-u pthread_include_pthread_rwlock_impl")
if(CONFIG_FREERTOS_ENABLE_STATIC_TASK_CLEAN_UP)
target_link_libraries(${COMPONENT_LIB} INTERFACE "-Wl,--wrap=vPortCleanUpTCB")

View File

@ -17,3 +17,6 @@ endif
COMPONENT_ADD_LDFLAGS += -u pthread_include_pthread_impl
COMPONENT_ADD_LDFLAGS += -u pthread_include_pthread_cond_impl
COMPONENT_ADD_LDFLAGS += -u pthread_include_pthread_local_storage_impl
COMPONENT_ADD_LDFLAGS += -u pthread_include_pthread_rwlock_impl
CFLAGS += -D_POSIX_READER_WRITER_LOCKS

View File

@ -1,20 +1,8 @@
// Copyright 2018 Espressif Systems (Shanghai) PTE LTD
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This module implements pthread API on top of FreeRTOS. API is implemented to the level allowing
// libstdcxx threading framework to operate correctly. So not all original pthread routines are supported.
//
/*
* SPDX-FileCopyrightText: 2018-2021 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <time.h>
#include <errno.h>
@ -64,9 +52,8 @@ typedef struct {
int type; ///< Mutex type. Currently supported PTHREAD_MUTEX_NORMAL and PTHREAD_MUTEX_RECURSIVE
} esp_pthread_mutex_t;
static SemaphoreHandle_t s_threads_mux = NULL;
portMUX_TYPE pthread_lazy_init_lock = portMUX_INITIALIZER_UNLOCKED; // Used for mutexes and cond vars
portMUX_TYPE pthread_lazy_init_lock = portMUX_INITIALIZER_UNLOCKED; // Used for mutexes and cond vars and rwlocks
static SLIST_HEAD(esp_thread_list_head, esp_pthread_entry) s_threads_list
= SLIST_HEAD_INITIALIZER(s_threads_list);
static pthread_key_t s_pthread_cfg_key;

View File

@ -0,0 +1,260 @@
/*
* SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <time.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <stdatomic.h>
#include "esp_err.h"
#include "esp_attr.h"
#include "sys/queue.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include "soc/soc_memory_layout.h"
#include "pthread_internal.h"
#include "esp_pthread.h"
#include "esp_log.h"
const static char *TAG = "pthread_rw_lock";
/** pthread rw_mutex FreeRTOS wrapper */
typedef struct {
/**
*
*/
pthread_cond_t cv;
pthread_mutex_t resource_mutex;
/**
* Number of current readers holding this lock, negative number means waiting readers
*/
int8_t active_readers;
uint8_t active_writers;
uint8_t waiting_writers;
} esp_pthread_rwlock_t;
#define WRITER_QUEUE_SIZE 4
#define READER_QUEUE_SIZE 4
int pthread_rwlock_init (pthread_rwlock_t *rwlock,
const pthread_rwlockattr_t *attr)
{
int result;
if (!rwlock) {
return EINVAL;
}
if (attr) {
// TODO: implement attributes in IDF-4284
return ENOSYS;
}
esp_pthread_rwlock_t *esp_rwlock = (esp_pthread_rwlock_t*) calloc(1, sizeof(esp_pthread_rwlock_t));
if (esp_rwlock == NULL) {
return ENOMEM;
}
result = pthread_mutex_init(&esp_rwlock->resource_mutex, NULL);
if (result != 0) {
free(esp_rwlock);
return ENOMEM;
}
result = pthread_cond_init(&esp_rwlock->cv, NULL);
if (result != 0) {
pthread_mutex_destroy(&esp_rwlock->resource_mutex);
free(esp_rwlock);
return ENOMEM;
}
esp_rwlock->active_readers = 0;
esp_rwlock->active_writers = 0;
esp_rwlock->waiting_writers = 0;
*rwlock = (pthread_rwlock_t) esp_rwlock;
return 0;
}
static int pthread_rwlock_init_if_static(pthread_rwlock_t *rwlock)
{
int res = 0;
if ((intptr_t) *rwlock == PTHREAD_RWLOCK_INITIALIZER) {
portENTER_CRITICAL(&pthread_lazy_init_lock);
if ((intptr_t) *rwlock == PTHREAD_RWLOCK_INITIALIZER) {
res = pthread_rwlock_init(rwlock, NULL);
}
portEXIT_CRITICAL(&pthread_lazy_init_lock);
}
return res;
}
int pthread_rwlock_destroy (pthread_rwlock_t *rwlock)
{
esp_pthread_rwlock_t *esp_rwlock;
ESP_LOGV(TAG, "%s %p", __FUNCTION__, rwlock);
if (!rwlock) {
return EINVAL;
}
if ((intptr_t) *rwlock == PTHREAD_RWLOCK_INITIALIZER) {
return 0; // Static rwlock was never initialized
}
esp_rwlock = (esp_pthread_rwlock_t *)*rwlock;
if (esp_rwlock == NULL) {
return EINVAL;
}
// TODO: necessary?
pthread_mutex_lock(&esp_rwlock->resource_mutex);
if (esp_rwlock->active_readers != 0 || esp_rwlock->active_writers > 0 || esp_rwlock->waiting_writers > 0) {
pthread_mutex_unlock(&esp_rwlock->resource_mutex);
return EBUSY;
}
// delete whole lock
pthread_cond_destroy(&esp_rwlock->cv);
pthread_mutex_unlock(&esp_rwlock->resource_mutex);
pthread_mutex_destroy(&esp_rwlock->resource_mutex);
free(esp_rwlock);
return 0;
}
static int checkrw_lock(pthread_rwlock_t *rwlock)
{
esp_pthread_rwlock_t *esp_rwlock;
int res;
if (rwlock == NULL) {
return EINVAL;
}
res = pthread_rwlock_init_if_static(rwlock);
if (res != 0) {
return res;
}
esp_rwlock = (esp_pthread_rwlock_t *)*rwlock;
if (esp_rwlock == NULL) {
return EINVAL;
}
return 0;
}
int pthread_rwlock_rdlock (pthread_rwlock_t *rwlock)
{
esp_pthread_rwlock_t *esp_rwlock;
int res;
res = checkrw_lock(rwlock);
if (res != 0) {
return res;
}
esp_rwlock = (esp_pthread_rwlock_t *)*rwlock;
res = pthread_mutex_lock(&esp_rwlock->resource_mutex);
if (res != 0) {
return res;
}
if (esp_rwlock->active_writers == 0) {
esp_rwlock->active_readers++;
} else {
while (true) {
pthread_cond_wait(&esp_rwlock->cv, &esp_rwlock->resource_mutex);
if (esp_rwlock->active_writers == 0 && esp_rwlock->waiting_writers == 0) {
esp_rwlock->active_readers++;
break;
}
}
}
pthread_mutex_unlock(&esp_rwlock->resource_mutex);
return 0;
}
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock)
{
esp_pthread_rwlock_t *esp_rwlock;
int res;
res = checkrw_lock(rwlock);
if (res != 0) {
return res;
}
esp_rwlock = (esp_pthread_rwlock_t *)*rwlock;
res = pthread_mutex_lock(&esp_rwlock->resource_mutex);
if (res != 0) {
return res;
}
esp_rwlock->waiting_writers++;
while (esp_rwlock->active_readers > 0 || esp_rwlock->active_writers > 0) {
pthread_cond_wait(&esp_rwlock->cv, &esp_rwlock->resource_mutex);
}
esp_rwlock->waiting_writers--;
esp_rwlock->active_writers++;
pthread_mutex_unlock(&esp_rwlock->resource_mutex);
return 0;
}
int pthread_rwlock_unlock (pthread_rwlock_t *rwlock)
{
esp_pthread_rwlock_t *esp_rwlock;
int res;
res = checkrw_lock(rwlock);
if (res != 0) {
return res;
}
esp_rwlock = (esp_pthread_rwlock_t *)*rwlock;
res = pthread_mutex_lock(&esp_rwlock->resource_mutex);
if (res != 0) {
return res;
}
assert(!(esp_rwlock->active_readers > 0 && esp_rwlock->active_writers > 0));
if (esp_rwlock->active_readers > 0) {
// we are a reader
esp_rwlock->active_readers--;
if (esp_rwlock->active_readers == 0) {
pthread_cond_broadcast(&esp_rwlock->cv);
}
} else {
// we are a writer
esp_rwlock->active_writers = 0;
pthread_cond_broadcast(&esp_rwlock->cv);
}
pthread_mutex_unlock(&esp_rwlock->resource_mutex);
return 0;
}
/* Hook function to force linking this file */
void pthread_include_pthread_rwlock_impl(void)
{
}

View File

@ -1,3 +1,8 @@
idf_component_register(SRC_DIRS "."
PRIV_INCLUDE_DIRS "."
set(sources "test_pthread.c"
"test_pthread_cond_var.c"
"test_pthread_local_storage.c"
"test_pthread_cxx.cpp"
"test_pthread_rwlock.c")
idf_component_register(SRCS ${sources}
PRIV_REQUIRES cmock test_utils pthread)

View File

@ -0,0 +1,301 @@
/*
* SPDX-FileCopyrightText: 2021 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: CC0
*
* This example code is in the Public Domain (or CC0 licensed, at your option.)
*
* Unless required by applicable law or agreed to in writing, this
* software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied.
*/
#include "sdkconfig.h"
#include <errno.h>
#include <stdatomic.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include "esp_timer.h"
#include "esp_pthread.h"
#include <pthread.h>
#include "unity.h"
TEST_CASE("pthread_rwlock_init invalid arg", "[pthread][rwlock]")
{
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(NULL, NULL), EINVAL);
}
TEST_CASE("pthread_rwlock_destroy invalid arg", "[pthread][rwlock]")
{
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(NULL), EINVAL);
pthread_rwlock_t rwlock = 0;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), EINVAL);
}
TEST_CASE("create and destroy rwlock", "[pthread][rwlock]")
{
pthread_rwlock_t rwlock;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(&rwlock, NULL), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
}
TEST_CASE("pthread_rwlock_destroy encounters static initializer", "[pthread][rwlock]")
{
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
}
TEST_CASE("rdlock invalid param", "[pthread][rwlock]")
{
TEST_ASSERT_EQUAL_INT(pthread_rwlock_rdlock(NULL), EINVAL);
pthread_rwlock_t rwlock = 0;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_rdlock(&rwlock), EINVAL);
}
TEST_CASE("unlock invalid param", "[pthread][rwlock]")
{
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(NULL), EINVAL);
pthread_rwlock_t rwlock = 0;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(&rwlock), EINVAL);
}
TEST_CASE("wrlock lock invalid param", "[pthread][rwlock]")
{
TEST_ASSERT_EQUAL_INT(pthread_rwlock_wrlock(NULL), EINVAL);
pthread_rwlock_t rwlock = 0;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_wrlock(&rwlock), EINVAL);
}
TEST_CASE("rdlock lock statically initialized lock", "[pthread][rwlock]")
{
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_rdlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
}
TEST_CASE("rdlock unlock", "[pthread][rwlock]")
{
pthread_rwlock_t rwlock;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(&rwlock, NULL), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_rdlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
}
TEST_CASE("multiple read locks", "[pthread][rwlock]")
{
pthread_rwlock_t rwlock;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(&rwlock, NULL), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_rdlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_rdlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
}
TEST_CASE("wrlock lock-unlock", "[pthread][rwlock]")
{
pthread_rwlock_t rwlock;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(&rwlock, NULL), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_wrlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(&rwlock), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
}
struct ReaderWriterArgs {
QueueHandle_t *wait_queue;
size_t sem_wait_release_num;
pthread_rwlock_t *rwlock;
volatile bool reading;
volatile bool writing;
};
static void *reader(void *arg)
{
uint8_t dummy_message;
struct ReaderWriterArgs *rw_args = (struct ReaderWriterArgs*) arg;
TEST_ASSERT_EQUAL(xQueueReceive(*(rw_args->wait_queue), &dummy_message, portMAX_DELAY), pdTRUE);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_rdlock(rw_args->rwlock), 0);
rw_args->reading = true;
TEST_ASSERT_FALSE(rw_args->writing);
rw_args->reading = false;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(rw_args->rwlock), 0);
return NULL;
}
static void *writer(void *arg)
{
uint8_t dummy_msg;
struct ReaderWriterArgs *rw_args = (struct ReaderWriterArgs*) arg;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_wrlock(rw_args->rwlock), 0);
rw_args->writing = true;
for (size_t i = 0; i < rw_args->sem_wait_release_num; i++) {
TEST_ASSERT_EQUAL(xQueueSendToBack(*(rw_args->wait_queue), &dummy_msg, portMAX_DELAY), pdTRUE);
}
TEST_ASSERT_FALSE(rw_args->reading);
vTaskDelay(20 / portTICK_PERIOD_MS);
TEST_ASSERT_FALSE(rw_args->reading);
rw_args->writing = false;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(rw_args->rwlock), 0);
return NULL;
}
TEST_CASE("wrlock reader waits", "[pthread][rwlock]")
{
QueueHandle_t wait_queue;
pthread_rwlock_t rwlock;
pthread_t reader_thread;
pthread_t writer_thread;
struct ReaderWriterArgs rw_args;
wait_queue = xQueueCreate(1, 1);
TEST_ASSERT(wait_queue);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(&rwlock, NULL), 0);
rw_args.wait_queue = &wait_queue;
rw_args.sem_wait_release_num = 1;
rw_args.rwlock = &rwlock;
rw_args.writing = false;
rw_args.reading = false;
TEST_ASSERT_EQUAL(pthread_create(&reader_thread, NULL, reader, &rw_args), 0);
TEST_ASSERT_EQUAL(pthread_create(&writer_thread, NULL, writer, &rw_args), 0);
TEST_ASSERT_EQUAL(pthread_join(writer_thread, NULL), 0);
TEST_ASSERT_EQUAL(pthread_join(reader_thread, NULL), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
vQueueDelete(wait_queue);
}
TEST_CASE("wrlock multiple readers wait", "[pthread][rwlock]")
{
static const size_t THREAD_NUM = 4;
QueueHandle_t wait_queue;
pthread_rwlock_t rwlock;
pthread_t reader_thread[THREAD_NUM];
pthread_t writer_thread;
struct ReaderWriterArgs rw_args;
wait_queue = xQueueCreate(THREAD_NUM, 1);
TEST_ASSERT(wait_queue);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(&rwlock, NULL), 0);
rw_args.wait_queue = &wait_queue;
rw_args.sem_wait_release_num = THREAD_NUM;
rw_args.rwlock = &rwlock;
rw_args.writing = false;
rw_args.reading = false;
for (size_t i = 0; i < THREAD_NUM; i++) {
TEST_ASSERT_EQUAL(pthread_create(&(reader_thread[i]), NULL, reader, &rw_args), 0);
}
TEST_ASSERT_EQUAL(pthread_create(&writer_thread, NULL, writer, &rw_args), 0);
TEST_ASSERT_EQUAL(pthread_join(writer_thread, NULL), 0);
for (size_t i = 0; i < THREAD_NUM; i++) {
TEST_ASSERT_EQUAL(pthread_join(reader_thread[i], NULL), 0);
}
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
vQueueDelete(wait_queue);
}
static void *writer2(void *arg)
{
uint8_t dummy_msg;
struct ReaderWriterArgs *rw_args = (struct ReaderWriterArgs*) arg;
TEST_ASSERT_EQUAL(xQueueReceive(*(rw_args->wait_queue), &dummy_msg, portMAX_DELAY), pdTRUE);
TEST_ASSERT_TRUE(rw_args->writing);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_wrlock(rw_args->rwlock), 0);
TEST_ASSERT_FALSE(rw_args->writing);
rw_args->writing = true;
vTaskDelay(10 / portTICK_PERIOD_MS);
rw_args->writing = false;
TEST_ASSERT_EQUAL_INT(pthread_rwlock_unlock(rw_args->rwlock), 0);
return NULL;
}
TEST_CASE("wrlock writer waits", "[pthread][rwlock]")
{
QueueHandle_t wait_queue;
pthread_rwlock_t rwlock;
pthread_t writer_thread;
pthread_t writer_2_thread;
struct ReaderWriterArgs rw_args;
wait_queue = xQueueCreate(1, 1);
TEST_ASSERT(wait_queue);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(&rwlock, NULL), 0);
rw_args.wait_queue = &wait_queue;
rw_args.sem_wait_release_num = 1;
rw_args.rwlock = &rwlock;
rw_args.writing = false;
rw_args.reading = false;
TEST_ASSERT_EQUAL(pthread_create(&writer_2_thread, NULL, writer2, &rw_args), 0);
TEST_ASSERT_EQUAL(pthread_create(&writer_thread, NULL, writer, &rw_args), 0);
TEST_ASSERT_EQUAL(pthread_join(writer_thread, NULL), 0);
TEST_ASSERT_EQUAL(pthread_join(writer_2_thread, NULL), 0);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
vQueueDelete(wait_queue);
}
TEST_CASE("wrlock multiple writers wait", "[pthread][rwlock]")
{
static const size_t THREAD_NUM = 4;
QueueHandle_t wait_queue;
pthread_rwlock_t rwlock;
pthread_t writer_thread;
pthread_t writer_2_thread[THREAD_NUM];
struct ReaderWriterArgs rw_args;
wait_queue = xQueueCreate(THREAD_NUM, 1);
TEST_ASSERT(wait_queue);
TEST_ASSERT_EQUAL_INT(pthread_rwlock_init(&rwlock, NULL), 0);
rw_args.wait_queue = &wait_queue;
rw_args.sem_wait_release_num = THREAD_NUM;
rw_args.rwlock = &rwlock;
rw_args.writing = false;
rw_args.reading = false;
for (size_t i = 0; i < THREAD_NUM; i++) {
TEST_ASSERT_EQUAL(pthread_create(&writer_2_thread[i], NULL, writer2, &rw_args), 0);
}
TEST_ASSERT_EQUAL(pthread_create(&writer_thread, NULL, writer, &rw_args), 0);
TEST_ASSERT_EQUAL(pthread_join(writer_thread, NULL), 0);
for (size_t i = 0; i < THREAD_NUM; i++) {
TEST_ASSERT_EQUAL(pthread_join(writer_2_thread[i], NULL), 0);
}
TEST_ASSERT_EQUAL_INT(pthread_rwlock_destroy(&rwlock), 0);
vQueueDelete(wait_queue);
}

View File

@ -1832,7 +1832,6 @@ components/protocomm/src/transports/protocomm_console.c
components/protocomm/src/transports/protocomm_httpd.c
components/protocomm/test/test_protocomm.c
components/pthread/include/esp_pthread.h
components/pthread/pthread.c
components/pthread/pthread_cond_var.c
components/pthread/pthread_internal.h
components/pthread/pthread_local_storage.c