From 2f46d9686dc39b1e446118ed856e8c6570c35f7b Mon Sep 17 00:00:00 2001 From: Euripedes Rocha Date: Thu, 4 May 2023 14:22:17 +0200 Subject: [PATCH] feat(mqtt/example): Adds custom outbox example Adds example presenting how to customize esp-mqtt outbox. --- examples/protocols/.build-test-rules.yml | 6 + .../mqtt/custom_outbox/CMakeLists.txt | 18 + .../protocols/mqtt/custom_outbox/README.md | 100 +++++ .../mqtt/custom_outbox/main/CMakeLists.txt | 3 + .../mqtt/custom_outbox/main/Kconfig.projbuild | 13 + .../mqtt/custom_outbox/main/app_main.c | 173 ++++++++ .../mqtt/custom_outbox/main/custom_outbox.cpp | 393 ++++++++++++++++++ .../mqtt/custom_outbox/main/idf_component.yml | 3 + .../mqtt/custom_outbox/sdkconfig.defaults | 2 + 9 files changed, 711 insertions(+) create mode 100644 examples/protocols/mqtt/custom_outbox/CMakeLists.txt create mode 100644 examples/protocols/mqtt/custom_outbox/README.md create mode 100644 examples/protocols/mqtt/custom_outbox/main/CMakeLists.txt create mode 100644 examples/protocols/mqtt/custom_outbox/main/Kconfig.projbuild create mode 100644 examples/protocols/mqtt/custom_outbox/main/app_main.c create mode 100644 examples/protocols/mqtt/custom_outbox/main/custom_outbox.cpp create mode 100644 examples/protocols/mqtt/custom_outbox/main/idf_component.yml create mode 100644 examples/protocols/mqtt/custom_outbox/sdkconfig.defaults diff --git a/examples/protocols/.build-test-rules.yml b/examples/protocols/.build-test-rules.yml index 5edb79bc0b..2b0b52480c 100644 --- a/examples/protocols/.build-test-rules.yml +++ b/examples/protocols/.build-test-rules.yml @@ -143,6 +143,12 @@ examples/protocols/modbus: temporary: true reason: not supported on p4 # TODO: IDF-7869 +examples/protocols/mqtt/custom_outbox: + disable: + - if: IDF_TARGET == "esp32p4" + temporary: true + reason: not supported on p4 # TODO: IDF-8077 + examples/protocols/mqtt/ssl: disable: - if: IDF_TARGET == "esp32p4" diff --git a/examples/protocols/mqtt/custom_outbox/CMakeLists.txt b/examples/protocols/mqtt/custom_outbox/CMakeLists.txt new file mode 100644 index 0000000000..4823b13388 --- /dev/null +++ b/examples/protocols/mqtt/custom_outbox/CMakeLists.txt @@ -0,0 +1,18 @@ +# The following four lines of boilerplate have to be in your project's CMakeLists +# in this exact order for cmake to work correctly +cmake_minimum_required(VERSION 3.16) + +include($ENV{IDF_PATH}/tools/cmake/project.cmake) +project(mqtt_tcp_custom_outbox) + +# Add custom outbox implementation to mqtt component +idf_component_get_property(mqtt mqtt COMPONENT_LIB) +target_sources(${mqtt} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/main/custom_outbox.cpp) + +# Our C++ needs an extra dependency to mqtt component, so we add it to mqtt component. +# This is needed because we are adding another source to the mqtt component and the build +# system needs to be aware of it to be able to compile and link the mqtt component. +# First we get our dependency +idf_component_get_property(pthread pthread COMPONENT_LIB) +# And them we link the components +target_link_libraries(${mqtt} ${pthread}) diff --git a/examples/protocols/mqtt/custom_outbox/README.md b/examples/protocols/mqtt/custom_outbox/README.md new file mode 100644 index 0000000000..29a31425b9 --- /dev/null +++ b/examples/protocols/mqtt/custom_outbox/README.md @@ -0,0 +1,100 @@ +| Supported Targets | ESP32 | ESP32-C2 | ESP32-C3 | ESP32-C6 | ESP32-H2 | ESP32-S2 | ESP32-S3 | +| ----------------- | ----- | -------- | -------- | -------- | -------- | -------- | -------- | + +# ESP-MQTT custom outbox sample application +(See the README.md file in the upper level 'examples' directory for more information about examples.) + +This example is a slightly modified version of the tcp example to show how to configure a custom outbox. +This example connects to the broker URI selected using `idf.py menuconfig` (using mqtt tcp transport) and as a demonstration subscribes/unsubscribes and send a message on certain topic. +(Please note that the public broker is maintained by the community so may not be always available, for details please see this [disclaimer](https://iot.eclipse.org/getting-started/#sandboxes)) + +Note: If the URI equals `FROM_STDIN` then the broker address is read from stdin upon application startup (used for testing) + +It uses ESP-MQTT library which implements mqtt client to connect to mqtt broker. + +## Necessary changes to customize the outbox + +To customize the outbox the first step is to enable it in the menuconfig option. + +With this option enabled, the default implementation isn't defined and the function definition needs to be added to mqtt component. +Any extra dependencies needed by the new sources also need to be added to the mqtt component. Refer to the example CMakeLists.txt file +for the details on how to do it. + +## The custom outbox in the example + +For the sake of this example the customized outbox implements the same functionalits of the regular but using C++ as a language. + +The implementation uses [C++ Polymorphic memory resources]() to control memory allocations and limit the usage of the memory. + +## How to use example + +### Hardware Required + +This example can be executed on any ESP32 board, the only required interface is WiFi and connection to internet. + +### Configure the project + +* Open the project configuration menu (`idf.py menuconfig`) +* Configure Wi-Fi or Ethernet under "Example Connection Configuration" menu. See "Establishing Wi-Fi or Ethernet Connection" section in [examples/protocols/README.md](../../README.md) for more details. + +Note that the mandatory configurations for this example, mqtt custom outbox and C++ exceptions are automatically added by the `sdkconfig.defaults` file. +### Build and Flash + +Build the project and flash it to the board, then run monitor tool to view serial output: + +``` +idf.py -p PORT flash monitor +``` + +(To exit the serial monitor, type ``Ctrl-]``.) + +See the Getting Started Guide for full steps to configure and use ESP-IDF to build projects. + +## Example Output + +``` +I (4635) example_common: Connected to example_netif_sta +I (4645) example_common: - IPv4 address: 192.168.33.206, +I (4645) example_common: - IPv6 address: fe80:0000:0000:0000:7e9e:bdff:fecf:00c0, type: ESP_IP6_ADDR_IS_LINK_LOCAL +I (4655) Monotonic: Monotonic: 400 bytes allocated, 400 total bytes in use +I (4665) Monotonic: Monotonic: 1000 bytes allocated, 1400 total bytes in use +I (4675) Monotonic: Monotonic: 128 bytes allocated, 1528 total bytes in use +I (4685) Pool: Pool: 32 bytes allocated, 32 total bytes in use +I (4685) Monotonic: Monotonic: 7688 bytes allocated, 9216 total bytes in use +I (4695) Monotonic: Monotonic: 128 bytes allocated, 9344 total bytes in use +I (4705) Pool: Pool: 480 bytes allocated, 512 total bytes in use +I (4715) Monotonic: Monotonic: 992 bytes allocated, 10336 total bytes in use +I (4715) Monotonic: Monotonic: 128 bytes allocated, 10464 total bytes in use +I (4725) Pool: Pool: 23 bytes allocated, 535 total bytes in use +I (4735) MQTT_EXAMPLE: Enqueued msg_id=14345 +I (4735) Pool: Pool: 29 bytes allocated, 564 total bytes in use +I (4745) MQTT_EXAMPLE: Enqueued msg_id=3507 +I (4745) MQTT_EXAMPLE: Other event id:7 +I (4755) main_task: Returned from app_main() +I (5085) MQTT_EXAMPLE: MQTT_EVENT_CONNECTED +I (5085) Pool: Pool: 23 bytes allocated, 587 total bytes in use +I (5085) MQTT_EXAMPLE: sent publish successful, msg_id=47425 +I (5085) Pool: Pool: 18 bytes allocated, 605 total bytes in use +I (5095) MQTT_EXAMPLE: sent subscribe successful, msg_id=60709 +I (5105) Pool: Pool: 18 bytes allocated, 623 total bytes in use +I (5105) MQTT_EXAMPLE: sent subscribe successful, msg_id=33273 +I (5395) Pool: Pool: 23 bytes deallocated, 623 total bytes in use +I (5395) MQTT_EXAMPLE: MQTT_EVENT_PUBLISHED, msg_id=47425 +I (6005) Pool: Pool: 18 bytes deallocated, 623 total bytes in use +I (6005) MQTT_EXAMPLE: MQTT_EVENT_SUBSCRIBED, msg_id=60709 +I (6005) MQTT_EXAMPLE: sent publish successful, msg_id=0 +I (6015) Pool: Pool: 18 bytes deallocated, 623 total bytes in use +I (6015) MQTT_EXAMPLE: MQTT_EVENT_SUBSCRIBED, msg_id=33273 +I (6025) MQTT_EXAMPLE: sent publish successful, msg_id=0 +I (6035) MQTT_EXAMPLE: MQTT_EVENT_DATA +TOPIC=/topic/qos1 +DATA=data_3 +I (6315) MQTT_EXAMPLE: MQTT_EVENT_DATA +TOPIC=/topic/qos1 +DATA=data_3 +I (6315) Pool: Pool: 23 bytes deallocated, 623 total bytes in use +I (6315) MQTT_EXAMPLE: MQTT_EVENT_PUBLISHED, msg_id=14345 +I (6615) MQTT_EXAMPLE: MQTT_EVENT_DATA +TOPIC=/topic/qos0 +DATA=data +``` diff --git a/examples/protocols/mqtt/custom_outbox/main/CMakeLists.txt b/examples/protocols/mqtt/custom_outbox/main/CMakeLists.txt new file mode 100644 index 0000000000..cb970c8d23 --- /dev/null +++ b/examples/protocols/mqtt/custom_outbox/main/CMakeLists.txt @@ -0,0 +1,3 @@ +idf_component_register(SRCS "app_main.c" + INCLUDE_DIRS "." + ) diff --git a/examples/protocols/mqtt/custom_outbox/main/Kconfig.projbuild b/examples/protocols/mqtt/custom_outbox/main/Kconfig.projbuild new file mode 100644 index 0000000000..c11539fb8d --- /dev/null +++ b/examples/protocols/mqtt/custom_outbox/main/Kconfig.projbuild @@ -0,0 +1,13 @@ +menu "Example Configuration" + + config BROKER_URL + string "Broker URL" + default "mqtt://mqtt.eclipseprojects.io" + help + URL of the broker to connect to + + config BROKER_URL_FROM_STDIN + bool + default y if BROKER_URL = "FROM_STDIN" + +endmenu diff --git a/examples/protocols/mqtt/custom_outbox/main/app_main.c b/examples/protocols/mqtt/custom_outbox/main/app_main.c new file mode 100644 index 0000000000..b30bc46aa4 --- /dev/null +++ b/examples/protocols/mqtt/custom_outbox/main/app_main.c @@ -0,0 +1,173 @@ +/* + * SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +/* MQTT (over TCP) Example with custom outbox + + This example code is in the Public Domain (or CC0 licensed, at your option.) + + Unless required by applicable law or agreed to in writing, this + software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + CONDITIONS OF ANY KIND, either express or implied. +*/ + +#include +#include +#include +#include +#include "esp_system.h" +#include "nvs_flash.h" +#include "esp_event.h" +#include "esp_netif.h" +#include "protocol_examples_common.h" + +#include "esp_log.h" +#include "mqtt_client.h" + +static const char *TAG = "MQTT_EXAMPLE"; + + +static void log_error_if_nonzero(const char *message, int error_code) +{ + if (error_code != 0) { + ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code); + } +} + +/* + * @brief Event handler registered to receive MQTT events + * + * This function is called by the MQTT client event loop. + * + * @param handler_args user data registered to the event. + * @param base Event base for the handler(always MQTT Base in this example). + * @param event_id The id for the received event. + * @param event_data The data for the event, esp_mqtt_event_handle_t. + */ +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) +{ + ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); + esp_mqtt_event_handle_t event = event_data; + esp_mqtt_client_handle_t client = event->client; + int msg_id; + switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0); + ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); + + msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); + ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); + + msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); + ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); + + break; + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); + break; + + case MQTT_EVENT_SUBSCRIBED: + ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); + msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); + ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); + break; + case MQTT_EVENT_UNSUBSCRIBED: + ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); + break; + case MQTT_EVENT_PUBLISHED: + ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); + break; + case MQTT_EVENT_DATA: + ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); + break; + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { + log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err); + log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err); + log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); + ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno)); + + } + break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; + } +} + +static void mqtt_app_start(void) +{ + esp_mqtt_client_config_t mqtt_cfg = { + .broker.address.uri = CONFIG_BROKER_URL, + }; +#if CONFIG_BROKER_URL_FROM_STDIN + char line[128]; + + if (strcmp(mqtt_cfg.broker.address.uri, "FROM_STDIN") == 0) { + int count = 0; + printf("Please enter url of mqtt broker\n"); + while (count < 128) { + int c = fgetc(stdin); + if (c == '\n') { + line[count] = '\0'; + break; + } else if (c > 0 && c < 127) { + line[count] = c; + ++count; + } + vTaskDelay(10 / portTICK_PERIOD_MS); + } + mqtt_cfg.broker.address.uri = line; + printf("Broker url: %s\n", line); + } else { + ESP_LOGE(TAG, "Configuration mismatch: wrong broker url"); + abort(); + } +#endif /* CONFIG_BROKER_URL_FROM_STDIN */ + + esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); + /* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */ + esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); + + /*Let's enqueue a few messages to the outbox to see the allocations*/ + int msg_id; + msg_id = esp_mqtt_client_enqueue(client, "/topic/qos1", "data_3", 0, 1, 0, true); + ESP_LOGI(TAG, "Enqueued msg_id=%d", msg_id); + msg_id = esp_mqtt_client_enqueue(client, "/topic/qos2", "QoS2 message", 0, 2, 0, true); + ESP_LOGI(TAG, "Enqueued msg_id=%d", msg_id); + + /* Now we start the client and it's possible to see the memory usage for the operations in the outbox. */ + esp_mqtt_client_start(client); +} + +void app_main(void) +{ + ESP_LOGI(TAG, "[APP] Startup.."); + ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size()); + ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version()); + + esp_log_level_set("*", ESP_LOG_INFO); + esp_log_level_set("mqtt_client", ESP_LOG_VERBOSE); + esp_log_level_set("MQTT_EXAMPLE", ESP_LOG_VERBOSE); + esp_log_level_set("TRANSPORT_BASE", ESP_LOG_VERBOSE); + esp_log_level_set("esp-tls", ESP_LOG_VERBOSE); + esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE); + esp_log_level_set("custom_outbox", ESP_LOG_VERBOSE); + + ESP_ERROR_CHECK(nvs_flash_init()); + ESP_ERROR_CHECK(esp_netif_init()); + ESP_ERROR_CHECK(esp_event_loop_create_default()); + + /* This helper function configures Wi-Fi or Ethernet, as selected in menuconfig. + * Read "Establishing Wi-Fi or Ethernet Connection" section in + * examples/protocols/README.md for more information about this function. + */ + ESP_ERROR_CHECK(example_connect()); + + mqtt_app_start(); +} diff --git a/examples/protocols/mqtt/custom_outbox/main/custom_outbox.cpp b/examples/protocols/mqtt/custom_outbox/main/custom_outbox.cpp new file mode 100644 index 0000000000..2db9d7e49d --- /dev/null +++ b/examples/protocols/mqtt/custom_outbox/main/custom_outbox.cpp @@ -0,0 +1,393 @@ +/* + * SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "esp_log.h" +#include "mqtt_outbox.h" + +constexpr auto TAG = "custom_outbox"; + +/* + * The trace resource class is created here as an example on how to build a custom memory resource + * The class is only needed to show where we are allocating from and to track allocations and deallocations. + */ +class trace_resource : public std::pmr::memory_resource { +public: + explicit trace_resource(std::string resource_name, std::pmr::memory_resource *upstream_resource = std::pmr::get_default_resource()) : upstream{upstream_resource}, name{std::move(resource_name)} {} + [[nodiscard]] std::string_view get_name() const noexcept + { + return std::string_view(name); + } + [[nodiscard]] auto upstream_resource() const + { + return upstream; + } +private: + void *do_allocate(std::size_t bytes, std::size_t alignment) override + { + auto *allocated = upstream->allocate(bytes, alignment); + allocated_total += bytes; + ESP_LOGI(name.c_str(), "%s: %zu bytes allocated, %zu total bytes in use", name.c_str(), bytes, allocated_total); + return allocated; + } + void do_deallocate(void *ptr, std::size_t bytes, std::size_t alignment) override + { + upstream->deallocate(ptr, bytes, alignment); + ESP_LOGI(name.c_str(), "%s: %zu bytes deallocated, %zu total bytes in use", name.c_str(), bytes, allocated_total); + } + + [[nodiscard]] bool do_is_equal(const std::pmr::memory_resource &other) const noexcept override + { + return this == &other; + } + size_t allocated_total{}; + std::pmr::memory_resource *upstream; + std::string name; +}; + +struct outbox_item { + /* Defining the allocator_type to let compiler know that our type is allocator aware, + * This way the allocator used for the outbox is propagated to the messages*/ + using allocator_type = std::pmr::polymorphic_allocator<>; + + /* Few strong types to diferetiate parameters*/ + enum class id_t : int {}; + enum class type_t : int {}; + enum class qos_t : int {}; + + /* Allocator aware constructors */ + outbox_item( + std::pmr::vector message, + id_t msg_id, + type_t msg_type, + qos_t msg_qos, + outbox_tick_t tick, + pending_state_t pending_state, + allocator_type alloc = {} + ) : message(std::move(message), alloc), id(msg_id), type(msg_type), qos(msg_qos), tick(tick), pending_state(pending_state) {} + + /*Copy and move constructors have an extra allocator parameter, for copy default and allocator aware are the same.*/ + outbox_item(const outbox_item &other, allocator_type alloc = {}) : message(other.message, alloc), id(other.id), type(other.type), qos(other.qos), tick(other.tick), pending_state(other.pending_state) {} + outbox_item(outbox_item &&other, allocator_type alloc) noexcept : message(std::move(other.message), alloc), id(other.id), type(other.type), qos(other.qos), tick(other.tick), pending_state(other.pending_state) + {} + + outbox_item(const outbox_item &) = default; + outbox_item(outbox_item &&other) = default; + outbox_item &operator=(const outbox_item &rhs) = default; + outbox_item &operator=(outbox_item &&other) = default; + ~outbox_item() = default; + + /* Getters to support outbox operation */ + [[nodiscard]] auto state() const noexcept + { + return pending_state; + } + + [[nodiscard]] allocator_type get_allocator() const + { + return message.get_allocator(); + } + + void set(pending_state state) noexcept + { + pending_state = state; + } + + void set(outbox_tick_t n_tick) noexcept + { + tick = n_tick; + } + + [[nodiscard]] auto get_id() const noexcept + { + return id; + } + + [[nodiscard]] auto get_type() const noexcept + { + return type; + } + + [[nodiscard]] auto get_tick() const noexcept + { + return tick; + } + + [[nodiscard]] auto get_data(size_t *len, uint16_t *msg_id, int *msg_type, int *msg_qos) + { + *len = message.size(); + *msg_id = static_cast(id); + *msg_type = static_cast(type); + *msg_qos = static_cast(qos); + return message.data(); + } + + [[nodiscard]] auto get_size() const noexcept + { + return message.size(); + } + +private: + std::pmr::vector message; + id_t id; + type_t type; + qos_t qos; + outbox_tick_t tick; + pending_state_t pending_state; +}; + +/* + * For the outbox_t we let the special member functions as default and + * we don't extend the allocator aware versions for the sake of the simplicity, since the operations are not needed in the usage. + */ +struct outbox_t { + using allocator_type = std::pmr::polymorphic_allocator<>; + explicit outbox_t(allocator_type alloc = {}) : queue(alloc) {} + + outbox_item_handle_t get(outbox_item::id_t msg_id) + { + if (auto item = std::ranges::find_if(queue, [msg_id](auto & item) { + return item.get_id() == msg_id; + }); + item != std::end(queue)) { + return &(*item); + } + return nullptr; + } + + int delete_expired(outbox_tick_t current_tick, outbox_tick_t timeout) + { + return std::erase_if(queue, [current_tick, timeout, this](const outbox_item & item) { + if (current_tick - item.get_tick() > timeout) { + total_size -= item.get_size(); + return true; + } + return false; + }); + } + + outbox_item::id_t delete_single_expired(outbox_tick_t current_tick, outbox_tick_t timeout) + { + if (auto erase = std::ranges::find_if(queue, [current_tick, timeout](auto & item) { + return (current_tick - item.get_tick() > timeout); + }); erase != std::end(queue)) { + auto msg_id = erase->get_id(); + total_size -= erase->get_size(); + queue.erase(erase); + return msg_id; + } + return outbox_item::id_t{-1}; + } + + auto erase(outbox_item_handle_t to_erase) + { + return erase_if([to_erase](auto & item) { + return &item == to_erase; + }); + } + + auto erase(outbox_item::id_t msg_id, outbox_item::type_t msg_type) + { + return erase_if([msg_id, msg_type](auto & item) { + return (item.get_id() == msg_id && (item.get_type() == msg_type)); + }); + } + + [[nodiscard]] auto size() const noexcept + { + return total_size; + } + + void clear() + { + queue.clear(); + } + + outbox_item_handle_t enqueue(outbox_message_handle_t message, outbox_tick_t tick) noexcept + { + try { + auto &item = + queue.emplace_back(std::pmr::vector {message->data, message->data + message->len}, + outbox_item::id_t{message->msg_id}, + outbox_item::type_t{message->msg_type}, + outbox_item::qos_t{message->msg_qos}, + tick, + QUEUED + ); + total_size += item.get_size(); + ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%" PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(this)); + return &item; + } catch (const std::exception &e) { + return nullptr; + } + } + + outbox_item_handle_t dequeue(pending_state_t state, outbox_tick_t *tick) + { + if (auto item = std::ranges::find_if(queue, [state](auto & item) { + return item.state() == state; + }); + item != std::end(queue)) { + if (tick != nullptr) { + *tick = item->get_tick(); + } + return &(*item); + } + return nullptr; + } + [[nodiscard]] allocator_type get_allocator() const + { + return queue.get_allocator(); + } +private: + [[nodiscard]] esp_err_t erase_if(std::predicate auto &&predicate) + { + if (auto to_erase = std::ranges::find_if(queue, predicate); to_erase != std::end(queue)) { + total_size -= to_erase->get_size(); + queue.erase(to_erase); + return ESP_OK; + } + return ESP_FAIL; + } + std::size_t total_size{}; + std::pmr::deque queue ; +}; + +extern "C" { + + outbox_handle_t outbox_init() + { + /* First we create a fixed size memory buffer to be used. */ + static constexpr auto work_memory_size = 16 * 1024; + static std::array resource_buffer{}; + try { + /* + * Since the outbox is managed by a C API we can't rely on C++ automatic cleanup and smart pointers but, on production code it would be better to add the + * memory resources to outbox_t, applying RAII principles, and make only outbox_item allocator aware. For the sake of the example we are keeping them + * separated to explictly show the relations. + * First we create the monotonic buffer and add null_memory_resource as upstream. This way if our working memory is exausted an exception is thrown. + */ + auto *monotonic_resource = new std::pmr::monotonic_buffer_resource{resource_buffer.data(), resource_buffer.size(), std::pmr::null_memory_resource()}; + /*Here we add our custom trace wrapper type to trace allocations and deallocations*/ + auto *trace_monotonic = new trace_resource("Monotonic", monotonic_resource); + + /* We compose monotonic buffer with pool resource, since the monotonic deallocate is a no-op and we need to remove messages to not go out of memory.*/ + auto *pool_resource = new std::pmr::unsynchronized_pool_resource{trace_monotonic}; + auto *trace_pool = new trace_resource("Pool", pool_resource); + /* Our outbox class is created using the trace_pool as memory resource */ + auto *outbox = new outbox_t{trace_pool}; + return outbox; + } catch (const std::exception &e) { + ESP_LOGD(TAG, "Not enough memory to construct the outbox, review the resource_buffer size"); + return nullptr; + + } + } + + outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick) + { + return outbox->enqueue(message, tick); + } + + outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) + { + return outbox->get(outbox_item::id_t{msg_id}); + } + + outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick) + { + return outbox->dequeue(pending, tick); + } +} + +uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos) +{ + if (item == nullptr) { + return nullptr; + } + return item->get_data(len, msg_id, msg_type, qos); +} + +esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete) +{ + return outbox->erase(item_to_delete); + +} + +esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type) +{ + return outbox->erase(outbox_item::id_t{msg_id}, outbox_item::type_t{msg_type}); +} + +int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout) +{ + return static_cast(outbox->delete_single_expired(current_tick, timeout)); +} + +int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout) +{ + return outbox->delete_expired(current_tick, timeout); +} + +esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending) +{ + if (auto *item = outbox->get(outbox_item::id_t{msg_id}); item != nullptr) { + item->set(pending); + return ESP_OK; + } + return ESP_FAIL; +} + +pending_state_t outbox_item_get_pending(outbox_item_handle_t item) +{ + if (item != nullptr) { + return item->state(); + } + return QUEUED; +} + +esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick) +{ + if (auto *item = outbox->get(outbox_item::id_t{msg_id}); item != nullptr) { + item->set(tick); + return ESP_OK; + } + return ESP_FAIL; +} + +uint64_t outbox_get_size(outbox_handle_t outbox) +{ + return outbox->size(); +} + +void outbox_delete_all_items(outbox_handle_t outbox) +{ + outbox->clear(); +} + +void outbox_destroy(outbox_handle_t outbox) +{ + auto *trace_pool = static_cast(outbox->get_allocator().resource()); + auto *pool_resource = static_cast(trace_pool->upstream_resource()); + auto *trace_monotonic = static_cast(pool_resource->upstream_resource()); + auto *monotonic_resource = static_cast(trace_monotonic->upstream_resource()); + + delete monotonic_resource; + delete trace_monotonic; + delete pool_resource; + delete trace_pool; + delete outbox; +} diff --git a/examples/protocols/mqtt/custom_outbox/main/idf_component.yml b/examples/protocols/mqtt/custom_outbox/main/idf_component.yml new file mode 100644 index 0000000000..718194867b --- /dev/null +++ b/examples/protocols/mqtt/custom_outbox/main/idf_component.yml @@ -0,0 +1,3 @@ +dependencies: + protocol_examples_common: + path: ${IDF_PATH}/examples/common_components/protocol_examples_common diff --git a/examples/protocols/mqtt/custom_outbox/sdkconfig.defaults b/examples/protocols/mqtt/custom_outbox/sdkconfig.defaults new file mode 100644 index 0000000000..385da31baf --- /dev/null +++ b/examples/protocols/mqtt/custom_outbox/sdkconfig.defaults @@ -0,0 +1,2 @@ +CONFIG_MQTT_CUSTOM_OUTBOX=y +CONFIG_COMPILER_CXX_EXCEPTIONS=y