Merge branch 'example/mqtt_custom_outbox' into 'master'

Custom outbox for esp_mqtt

See merge request espressif/esp-idf!24233
This commit is contained in:
Rocha Euripedes 2023-10-13 13:09:48 +08:00
commit 1a91a85e17
9 changed files with 711 additions and 0 deletions

View File

@ -143,6 +143,12 @@ examples/protocols/modbus:
temporary: true
reason: not supported on p4 # TODO: IDF-7869
examples/protocols/mqtt/custom_outbox:
disable:
- if: IDF_TARGET == "esp32p4"
temporary: true
reason: not supported on p4 # TODO: IDF-8077
examples/protocols/mqtt/ssl:
disable:
- if: IDF_TARGET == "esp32p4"

View File

@ -0,0 +1,18 @@
# The following four lines of boilerplate have to be in your project's CMakeLists
# in this exact order for cmake to work correctly
cmake_minimum_required(VERSION 3.16)
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
project(mqtt_tcp_custom_outbox)
# Add custom outbox implementation to mqtt component
idf_component_get_property(mqtt mqtt COMPONENT_LIB)
target_sources(${mqtt} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/main/custom_outbox.cpp)
# Our C++ needs an extra dependency to mqtt component, so we add it to mqtt component.
# This is needed because we are adding another source to the mqtt component and the build
# system needs to be aware of it to be able to compile and link the mqtt component.
# First we get our dependency
idf_component_get_property(pthread pthread COMPONENT_LIB)
# And them we link the components
target_link_libraries(${mqtt} ${pthread})

View File

@ -0,0 +1,100 @@
| Supported Targets | ESP32 | ESP32-C2 | ESP32-C3 | ESP32-C6 | ESP32-H2 | ESP32-S2 | ESP32-S3 |
| ----------------- | ----- | -------- | -------- | -------- | -------- | -------- | -------- |
# ESP-MQTT custom outbox sample application
(See the README.md file in the upper level 'examples' directory for more information about examples.)
This example is a slightly modified version of the tcp example to show how to configure a custom outbox.
This example connects to the broker URI selected using `idf.py menuconfig` (using mqtt tcp transport) and as a demonstration subscribes/unsubscribes and send a message on certain topic.
(Please note that the public broker is maintained by the community so may not be always available, for details please see this [disclaimer](https://iot.eclipse.org/getting-started/#sandboxes))
Note: If the URI equals `FROM_STDIN` then the broker address is read from stdin upon application startup (used for testing)
It uses ESP-MQTT library which implements mqtt client to connect to mqtt broker.
## Necessary changes to customize the outbox
To customize the outbox the first step is to enable it in the menuconfig option.
With this option enabled, the default implementation isn't defined and the function definition needs to be added to mqtt component.
Any extra dependencies needed by the new sources also need to be added to the mqtt component. Refer to the example CMakeLists.txt file
for the details on how to do it.
## The custom outbox in the example
For the sake of this example the customized outbox implements the same functionalits of the regular but using C++ as a language.
The implementation uses [C++ Polymorphic memory resources]() to control memory allocations and limit the usage of the memory.
## How to use example
### Hardware Required
This example can be executed on any ESP32 board, the only required interface is WiFi and connection to internet.
### Configure the project
* Open the project configuration menu (`idf.py menuconfig`)
* Configure Wi-Fi or Ethernet under "Example Connection Configuration" menu. See "Establishing Wi-Fi or Ethernet Connection" section in [examples/protocols/README.md](../../README.md) for more details.
Note that the mandatory configurations for this example, mqtt custom outbox and C++ exceptions are automatically added by the `sdkconfig.defaults` file.
### Build and Flash
Build the project and flash it to the board, then run monitor tool to view serial output:
```
idf.py -p PORT flash monitor
```
(To exit the serial monitor, type ``Ctrl-]``.)
See the Getting Started Guide for full steps to configure and use ESP-IDF to build projects.
## Example Output
```
I (4635) example_common: Connected to example_netif_sta
I (4645) example_common: - IPv4 address: 192.168.33.206,
I (4645) example_common: - IPv6 address: fe80:0000:0000:0000:7e9e:bdff:fecf:00c0, type: ESP_IP6_ADDR_IS_LINK_LOCAL
I (4655) Monotonic: Monotonic: 400 bytes allocated, 400 total bytes in use
I (4665) Monotonic: Monotonic: 1000 bytes allocated, 1400 total bytes in use
I (4675) Monotonic: Monotonic: 128 bytes allocated, 1528 total bytes in use
I (4685) Pool: Pool: 32 bytes allocated, 32 total bytes in use
I (4685) Monotonic: Monotonic: 7688 bytes allocated, 9216 total bytes in use
I (4695) Monotonic: Monotonic: 128 bytes allocated, 9344 total bytes in use
I (4705) Pool: Pool: 480 bytes allocated, 512 total bytes in use
I (4715) Monotonic: Monotonic: 992 bytes allocated, 10336 total bytes in use
I (4715) Monotonic: Monotonic: 128 bytes allocated, 10464 total bytes in use
I (4725) Pool: Pool: 23 bytes allocated, 535 total bytes in use
I (4735) MQTT_EXAMPLE: Enqueued msg_id=14345
I (4735) Pool: Pool: 29 bytes allocated, 564 total bytes in use
I (4745) MQTT_EXAMPLE: Enqueued msg_id=3507
I (4745) MQTT_EXAMPLE: Other event id:7
I (4755) main_task: Returned from app_main()
I (5085) MQTT_EXAMPLE: MQTT_EVENT_CONNECTED
I (5085) Pool: Pool: 23 bytes allocated, 587 total bytes in use
I (5085) MQTT_EXAMPLE: sent publish successful, msg_id=47425
I (5085) Pool: Pool: 18 bytes allocated, 605 total bytes in use
I (5095) MQTT_EXAMPLE: sent subscribe successful, msg_id=60709
I (5105) Pool: Pool: 18 bytes allocated, 623 total bytes in use
I (5105) MQTT_EXAMPLE: sent subscribe successful, msg_id=33273
I (5395) Pool: Pool: 23 bytes deallocated, 623 total bytes in use
I (5395) MQTT_EXAMPLE: MQTT_EVENT_PUBLISHED, msg_id=47425
I (6005) Pool: Pool: 18 bytes deallocated, 623 total bytes in use
I (6005) MQTT_EXAMPLE: MQTT_EVENT_SUBSCRIBED, msg_id=60709
I (6005) MQTT_EXAMPLE: sent publish successful, msg_id=0
I (6015) Pool: Pool: 18 bytes deallocated, 623 total bytes in use
I (6015) MQTT_EXAMPLE: MQTT_EVENT_SUBSCRIBED, msg_id=33273
I (6025) MQTT_EXAMPLE: sent publish successful, msg_id=0
I (6035) MQTT_EXAMPLE: MQTT_EVENT_DATA
TOPIC=/topic/qos1
DATA=data_3
I (6315) MQTT_EXAMPLE: MQTT_EVENT_DATA
TOPIC=/topic/qos1
DATA=data_3
I (6315) Pool: Pool: 23 bytes deallocated, 623 total bytes in use
I (6315) MQTT_EXAMPLE: MQTT_EVENT_PUBLISHED, msg_id=14345
I (6615) MQTT_EXAMPLE: MQTT_EVENT_DATA
TOPIC=/topic/qos0
DATA=data
```

View File

@ -0,0 +1,3 @@
idf_component_register(SRCS "app_main.c"
INCLUDE_DIRS "."
)

View File

@ -0,0 +1,13 @@
menu "Example Configuration"
config BROKER_URL
string "Broker URL"
default "mqtt://mqtt.eclipseprojects.io"
help
URL of the broker to connect to
config BROKER_URL_FROM_STDIN
bool
default y if BROKER_URL = "FROM_STDIN"
endmenu

View File

@ -0,0 +1,173 @@
/*
* SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
/* MQTT (over TCP) Example with custom outbox
This example code is in the Public Domain (or CC0 licensed, at your option.)
Unless required by applicable law or agreed to in writing, this
software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied.
*/
#include <stdio.h>
#include <stdint.h>
#include <stddef.h>
#include <string.h>
#include "esp_system.h"
#include "nvs_flash.h"
#include "esp_event.h"
#include "esp_netif.h"
#include "protocol_examples_common.h"
#include "esp_log.h"
#include "mqtt_client.h"
static const char *TAG = "MQTT_EXAMPLE";
static void log_error_if_nonzero(const char *message, int error_code)
{
if (error_code != 0) {
ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code);
}
}
/*
* @brief Event handler registered to receive MQTT events
*
* This function is called by the MQTT client event loop.
*
* @param handler_args user data registered to the event.
* @param base Event base for the handler(always MQTT Base in this example).
* @param event_id The id for the received event.
* @param event_data The data for the event, esp_mqtt_event_handle_t.
*/
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id);
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
int msg_id;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno);
ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));
}
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
static void mqtt_app_start(void)
{
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_BROKER_URL,
};
#if CONFIG_BROKER_URL_FROM_STDIN
char line[128];
if (strcmp(mqtt_cfg.broker.address.uri, "FROM_STDIN") == 0) {
int count = 0;
printf("Please enter url of mqtt broker\n");
while (count < 128) {
int c = fgetc(stdin);
if (c == '\n') {
line[count] = '\0';
break;
} else if (c > 0 && c < 127) {
line[count] = c;
++count;
}
vTaskDelay(10 / portTICK_PERIOD_MS);
}
mqtt_cfg.broker.address.uri = line;
printf("Broker url: %s\n", line);
} else {
ESP_LOGE(TAG, "Configuration mismatch: wrong broker url");
abort();
}
#endif /* CONFIG_BROKER_URL_FROM_STDIN */
esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
/* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
/*Let's enqueue a few messages to the outbox to see the allocations*/
int msg_id;
msg_id = esp_mqtt_client_enqueue(client, "/topic/qos1", "data_3", 0, 1, 0, true);
ESP_LOGI(TAG, "Enqueued msg_id=%d", msg_id);
msg_id = esp_mqtt_client_enqueue(client, "/topic/qos2", "QoS2 message", 0, 2, 0, true);
ESP_LOGI(TAG, "Enqueued msg_id=%d", msg_id);
/* Now we start the client and it's possible to see the memory usage for the operations in the outbox. */
esp_mqtt_client_start(client);
}
void app_main(void)
{
ESP_LOGI(TAG, "[APP] Startup..");
ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size());
ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version());
esp_log_level_set("*", ESP_LOG_INFO);
esp_log_level_set("mqtt_client", ESP_LOG_VERBOSE);
esp_log_level_set("MQTT_EXAMPLE", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT_BASE", ESP_LOG_VERBOSE);
esp_log_level_set("esp-tls", ESP_LOG_VERBOSE);
esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE);
esp_log_level_set("custom_outbox", ESP_LOG_VERBOSE);
ESP_ERROR_CHECK(nvs_flash_init());
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());
/* This helper function configures Wi-Fi or Ethernet, as selected in menuconfig.
* Read "Establishing Wi-Fi or Ethernet Connection" section in
* examples/protocols/README.md for more information about this function.
*/
ESP_ERROR_CHECK(example_connect());
mqtt_app_start();
}

View File

@ -0,0 +1,393 @@
/*
* SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
*
* SPDX-License-Identifier: Unlicense OR CC0-1.0
*/
#include <algorithm>
#include <cstddef>
#include <exception>
#include <deque>
#include <cstdint>
#include <memory>
#include <ranges>
#include <utility>
#include <vector>
#include <string>
#include <memory_resource>
#include "esp_log.h"
#include "mqtt_outbox.h"
constexpr auto TAG = "custom_outbox";
/*
* The trace resource class is created here as an example on how to build a custom memory resource
* The class is only needed to show where we are allocating from and to track allocations and deallocations.
*/
class trace_resource : public std::pmr::memory_resource {
public:
explicit trace_resource(std::string resource_name, std::pmr::memory_resource *upstream_resource = std::pmr::get_default_resource()) : upstream{upstream_resource}, name{std::move(resource_name)} {}
[[nodiscard]] std::string_view get_name() const noexcept
{
return std::string_view(name);
}
[[nodiscard]] auto upstream_resource() const
{
return upstream;
}
private:
void *do_allocate(std::size_t bytes, std::size_t alignment) override
{
auto *allocated = upstream->allocate(bytes, alignment);
allocated_total += bytes;
ESP_LOGI(name.c_str(), "%s: %zu bytes allocated, %zu total bytes in use", name.c_str(), bytes, allocated_total);
return allocated;
}
void do_deallocate(void *ptr, std::size_t bytes, std::size_t alignment) override
{
upstream->deallocate(ptr, bytes, alignment);
ESP_LOGI(name.c_str(), "%s: %zu bytes deallocated, %zu total bytes in use", name.c_str(), bytes, allocated_total);
}
[[nodiscard]] bool do_is_equal(const std::pmr::memory_resource &other) const noexcept override
{
return this == &other;
}
size_t allocated_total{};
std::pmr::memory_resource *upstream;
std::string name;
};
struct outbox_item {
/* Defining the allocator_type to let compiler know that our type is allocator aware,
* This way the allocator used for the outbox is propagated to the messages*/
using allocator_type = std::pmr::polymorphic_allocator<>;
/* Few strong types to diferetiate parameters*/
enum class id_t : int {};
enum class type_t : int {};
enum class qos_t : int {};
/* Allocator aware constructors */
outbox_item(
std::pmr::vector<uint8_t> message,
id_t msg_id,
type_t msg_type,
qos_t msg_qos,
outbox_tick_t tick,
pending_state_t pending_state,
allocator_type alloc = {}
) : message(std::move(message), alloc), id(msg_id), type(msg_type), qos(msg_qos), tick(tick), pending_state(pending_state) {}
/*Copy and move constructors have an extra allocator parameter, for copy default and allocator aware are the same.*/
outbox_item(const outbox_item &other, allocator_type alloc = {}) : message(other.message, alloc), id(other.id), type(other.type), qos(other.qos), tick(other.tick), pending_state(other.pending_state) {}
outbox_item(outbox_item &&other, allocator_type alloc) noexcept : message(std::move(other.message), alloc), id(other.id), type(other.type), qos(other.qos), tick(other.tick), pending_state(other.pending_state)
{}
outbox_item(const outbox_item &) = default;
outbox_item(outbox_item &&other) = default;
outbox_item &operator=(const outbox_item &rhs) = default;
outbox_item &operator=(outbox_item &&other) = default;
~outbox_item() = default;
/* Getters to support outbox operation */
[[nodiscard]] auto state() const noexcept
{
return pending_state;
}
[[nodiscard]] allocator_type get_allocator() const
{
return message.get_allocator();
}
void set(pending_state state) noexcept
{
pending_state = state;
}
void set(outbox_tick_t n_tick) noexcept
{
tick = n_tick;
}
[[nodiscard]] auto get_id() const noexcept
{
return id;
}
[[nodiscard]] auto get_type() const noexcept
{
return type;
}
[[nodiscard]] auto get_tick() const noexcept
{
return tick;
}
[[nodiscard]] auto get_data(size_t *len, uint16_t *msg_id, int *msg_type, int *msg_qos)
{
*len = message.size();
*msg_id = static_cast<uint16_t>(id);
*msg_type = static_cast<int>(type);
*msg_qos = static_cast<int>(qos);
return message.data();
}
[[nodiscard]] auto get_size() const noexcept
{
return message.size();
}
private:
std::pmr::vector<uint8_t> message;
id_t id;
type_t type;
qos_t qos;
outbox_tick_t tick;
pending_state_t pending_state;
};
/*
* For the outbox_t we let the special member functions as default and
* we don't extend the allocator aware versions for the sake of the simplicity, since the operations are not needed in the usage.
*/
struct outbox_t {
using allocator_type = std::pmr::polymorphic_allocator<>;
explicit outbox_t(allocator_type alloc = {}) : queue(alloc) {}
outbox_item_handle_t get(outbox_item::id_t msg_id)
{
if (auto item = std::ranges::find_if(queue, [msg_id](auto & item) {
return item.get_id() == msg_id;
});
item != std::end(queue)) {
return &(*item);
}
return nullptr;
}
int delete_expired(outbox_tick_t current_tick, outbox_tick_t timeout)
{
return std::erase_if(queue, [current_tick, timeout, this](const outbox_item & item) {
if (current_tick - item.get_tick() > timeout) {
total_size -= item.get_size();
return true;
}
return false;
});
}
outbox_item::id_t delete_single_expired(outbox_tick_t current_tick, outbox_tick_t timeout)
{
if (auto erase = std::ranges::find_if(queue, [current_tick, timeout](auto & item) {
return (current_tick - item.get_tick() > timeout);
}); erase != std::end(queue)) {
auto msg_id = erase->get_id();
total_size -= erase->get_size();
queue.erase(erase);
return msg_id;
}
return outbox_item::id_t{-1};
}
auto erase(outbox_item_handle_t to_erase)
{
return erase_if([to_erase](auto & item) {
return &item == to_erase;
});
}
auto erase(outbox_item::id_t msg_id, outbox_item::type_t msg_type)
{
return erase_if([msg_id, msg_type](auto & item) {
return (item.get_id() == msg_id && (item.get_type() == msg_type));
});
}
[[nodiscard]] auto size() const noexcept
{
return total_size;
}
void clear()
{
queue.clear();
}
outbox_item_handle_t enqueue(outbox_message_handle_t message, outbox_tick_t tick) noexcept
{
try {
auto &item =
queue.emplace_back(std::pmr::vector<uint8_t> {message->data, message->data + message->len},
outbox_item::id_t{message->msg_id},
outbox_item::type_t{message->msg_type},
outbox_item::qos_t{message->msg_qos},
tick,
QUEUED
);
total_size += item.get_size();
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%" PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(this));
return &item;
} catch (const std::exception &e) {
return nullptr;
}
}
outbox_item_handle_t dequeue(pending_state_t state, outbox_tick_t *tick)
{
if (auto item = std::ranges::find_if(queue, [state](auto & item) {
return item.state() == state;
});
item != std::end(queue)) {
if (tick != nullptr) {
*tick = item->get_tick();
}
return &(*item);
}
return nullptr;
}
[[nodiscard]] allocator_type get_allocator() const
{
return queue.get_allocator();
}
private:
[[nodiscard]] esp_err_t erase_if(std::predicate<outbox_item &> auto &&predicate)
{
if (auto to_erase = std::ranges::find_if(queue, predicate); to_erase != std::end(queue)) {
total_size -= to_erase->get_size();
queue.erase(to_erase);
return ESP_OK;
}
return ESP_FAIL;
}
std::size_t total_size{};
std::pmr::deque<outbox_item> queue ;
};
extern "C" {
outbox_handle_t outbox_init()
{
/* First we create a fixed size memory buffer to be used. */
static constexpr auto work_memory_size = 16 * 1024;
static std::array<std::byte, work_memory_size> resource_buffer{};
try {
/*
* Since the outbox is managed by a C API we can't rely on C++ automatic cleanup and smart pointers but, on production code it would be better to add the
* memory resources to outbox_t, applying RAII principles, and make only outbox_item allocator aware. For the sake of the example we are keeping them
* separated to explictly show the relations.
* First we create the monotonic buffer and add null_memory_resource as upstream. This way if our working memory is exausted an exception is thrown.
*/
auto *monotonic_resource = new std::pmr::monotonic_buffer_resource{resource_buffer.data(), resource_buffer.size(), std::pmr::null_memory_resource()};
/*Here we add our custom trace wrapper type to trace allocations and deallocations*/
auto *trace_monotonic = new trace_resource("Monotonic", monotonic_resource);
/* We compose monotonic buffer with pool resource, since the monotonic deallocate is a no-op and we need to remove messages to not go out of memory.*/
auto *pool_resource = new std::pmr::unsynchronized_pool_resource{trace_monotonic};
auto *trace_pool = new trace_resource("Pool", pool_resource);
/* Our outbox class is created using the trace_pool as memory resource */
auto *outbox = new outbox_t{trace_pool};
return outbox;
} catch (const std::exception &e) {
ESP_LOGD(TAG, "Not enough memory to construct the outbox, review the resource_buffer size");
return nullptr;
}
}
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
{
return outbox->enqueue(message, tick);
}
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
{
return outbox->get(outbox_item::id_t{msg_id});
}
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
{
return outbox->dequeue(pending, tick);
}
}
uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
{
if (item == nullptr) {
return nullptr;
}
return item->get_data(len, msg_id, msg_type, qos);
}
esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
{
return outbox->erase(item_to_delete);
}
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
return outbox->erase(outbox_item::id_t{msg_id}, outbox_item::type_t{msg_type});
}
int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
return static_cast<int>(outbox->delete_single_expired(current_tick, timeout));
}
int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
{
return outbox->delete_expired(current_tick, timeout);
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
{
if (auto *item = outbox->get(outbox_item::id_t{msg_id}); item != nullptr) {
item->set(pending);
return ESP_OK;
}
return ESP_FAIL;
}
pending_state_t outbox_item_get_pending(outbox_item_handle_t item)
{
if (item != nullptr) {
return item->state();
}
return QUEUED;
}
esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick)
{
if (auto *item = outbox->get(outbox_item::id_t{msg_id}); item != nullptr) {
item->set(tick);
return ESP_OK;
}
return ESP_FAIL;
}
uint64_t outbox_get_size(outbox_handle_t outbox)
{
return outbox->size();
}
void outbox_delete_all_items(outbox_handle_t outbox)
{
outbox->clear();
}
void outbox_destroy(outbox_handle_t outbox)
{
auto *trace_pool = static_cast<trace_resource *>(outbox->get_allocator().resource());
auto *pool_resource = static_cast<std::pmr::unsynchronized_pool_resource *>(trace_pool->upstream_resource());
auto *trace_monotonic = static_cast<trace_resource *>(pool_resource->upstream_resource());
auto *monotonic_resource = static_cast<std::pmr::monotonic_buffer_resource *>(trace_monotonic->upstream_resource());
delete monotonic_resource;
delete trace_monotonic;
delete pool_resource;
delete trace_pool;
delete outbox;
}

View File

@ -0,0 +1,3 @@
dependencies:
protocol_examples_common:
path: ${IDF_PATH}/examples/common_components/protocol_examples_common

View File

@ -0,0 +1,2 @@
CONFIG_MQTT_CUSTOM_OUTBOX=y
CONFIG_COMPILER_CXX_EXCEPTIONS=y