Merge branch 'bugfix/mqtt_ci_paho_thread' into 'master'

ci/mqtt: Fix publish-connect test (thread safe paho-mqtt issue)

Closes IDF-4607

See merge request espressif/esp-idf!16938
This commit is contained in:
David Čermák 2022-01-28 20:04:00 +00:00
commit 3741486778
2 changed files with 33 additions and 11 deletions

View File

@ -223,7 +223,7 @@ test_weekend_mqtt:
- .rules:labels:weekend_test
tags:
- ESP32
- Example_WIFI
- Example_EthKitV1
script:
- export MQTT_PUBLISH_TEST=1
- export TEST_PATH=$CI_PROJECT_DIR/tools/test_apps/protocols/mqtt/publish_connect_test

View File

@ -9,7 +9,9 @@ import ssl
import string
import subprocess
import sys
from threading import Event, Thread
import time
from itertools import count
from threading import Event, Lock, Thread
import paho.mqtt.client as mqtt
import ttfw_idf
@ -59,6 +61,7 @@ class MqttPublisher:
self.publish_cfg['qos'] = qos
self.publish_cfg['queue'] = queue
self.publish_cfg['transport'] = transport
self.lock = Lock()
# static variables used to pass options to and from static callbacks of paho-mqtt client
MqttPublisher.event_client_connected = Event()
MqttPublisher.event_client_got_all = Event()
@ -71,9 +74,11 @@ class MqttPublisher:
if self.log_details:
print(text)
def mqtt_client_task(self, client):
def mqtt_client_task(self, client, lock):
while not self.event_stop_client.is_set():
client.loop()
with lock:
client.loop()
time.sleep(0.001) # yield to other threads
# The callback for when the client receives a CONNACK response from the server (needs to be static)
@staticmethod
@ -120,18 +125,20 @@ class MqttPublisher:
self.print_details('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}'.format(broker_host))
raise
# Starting a py-client in a separate thread
thread1 = Thread(target=self.mqtt_client_task, args=(self.client,))
thread1 = Thread(target=self.mqtt_client_task, args=(self.client, self.lock))
thread1.start()
self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port))
if not MqttPublisher.event_client_connected.wait(timeout=30):
raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host))
self.client.subscribe(self.publish_cfg['subscribe_topic'], qos)
with self.lock:
self.client.subscribe(self.publish_cfg['subscribe_topic'], qos)
self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol='\n')
try:
# waiting till subscribed to defined topic
self.dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30)
for _ in range(MqttPublisher.published):
self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos)
with self.lock:
self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos)
self.print_details('Publishing...')
self.print_details('Checking esp-client received msg published from py-client...')
self.dut.expect(re.compile(r'Correct pattern received exactly x times'), timeout=60)
@ -377,16 +384,31 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
with MqttPublisher(dut1, transport, qos, repeat, published, queue, publish_cfg):
pass
# Initialize message sizes and repeat counts (if defined in the environment)
messages = []
for i in count(0):
# Check env variable: MQTT_PUBLISH_MSG_{len|repeat}_{x}
env_dict = {var:'MQTT_PUBLISH_MSG_' + var + '_' + str(i) for var in ['len', 'repeat']}
if os.getenv(env_dict['len']) and os.getenv(env_dict['repeat']):
messages.append({var: int(os.getenv(env_dict[var])) for var in ['len', 'repeat']})
continue
break
if not messages: # No message sizes present in the env - set defaults
messages = [{'len':0, 'repeat':5}, # zero-sized messages
{'len':2, 'repeat':10}, # short messages
{'len':200, 'repeat':3}, # long messages
{'len':20, 'repeat':50} # many medium sized
]
# Iterate over all publish message properties
for qos in [0, 1, 2]:
for transport in ['tcp', 'ssl', 'ws', 'wss']:
for q in [0, 1]:
if publish_cfg['broker_host_' + transport] is None:
print('Skipping transport: {}...'.format(transport))
continue
start_publish_case(transport, qos, 0, 5, q)
start_publish_case(transport, qos, 2, 5, q)
start_publish_case(transport, qos, 50, 1, q)
start_publish_case(transport, qos, 10, 20, q)
for msg in messages:
start_publish_case(transport, qos, msg['len'], msg['repeat'], q)
if __name__ == '__main__':