mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
Merge branch 'feature/modbus_remove_example_tests' into 'master'
modbus remove example tests Closes IDFCI-1222 See merge request espressif/esp-idf!22434
This commit is contained in:
commit
df00fd874f
@ -1138,18 +1138,6 @@ example_test_007:
|
||||
- ESP32
|
||||
- Example_I2C_CCS811_SENSOR
|
||||
|
||||
example_test_011:
|
||||
extends: .example_test_esp32_template
|
||||
tags:
|
||||
- ESP32
|
||||
- Example_T2_RS485
|
||||
|
||||
example_test_016:
|
||||
extends: .example_test_esp32_template
|
||||
tags:
|
||||
- ESP32
|
||||
- Example_Modbus_TCP
|
||||
|
||||
example_test_017:
|
||||
extends: .example_test_esp32s2_template
|
||||
tags:
|
||||
|
@ -132,10 +132,6 @@ examples/protocols/mdns:
|
||||
examples/protocols/modbus:
|
||||
disable:
|
||||
- if: IDF_TARGET in ["esp32h2"]
|
||||
disable_test:
|
||||
- if: IDF_TARGET != "esp32"
|
||||
temporary: true
|
||||
reason: lack of runners
|
||||
|
||||
examples/protocols/mqtt/ssl:
|
||||
disable:
|
||||
|
@ -1,290 +0,0 @@
|
||||
# SPDX-FileCopyrightText: 2016-2021 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
# Need Python 3 string formatting functions
|
||||
from __future__ import print_function
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from threading import Thread
|
||||
|
||||
import ttfw_idf
|
||||
|
||||
LOG_LEVEL = logging.DEBUG
|
||||
LOGGER_NAME = 'modbus_test'
|
||||
|
||||
# Allowed parameter reads
|
||||
TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings
|
||||
TEST_READ_MAX_ERR_COUNT = 2 # Maximum allowed read errors during initialization
|
||||
|
||||
TEST_THREAD_EXPECT_TIMEOUT = 120 # Test theread expect timeout in seconds
|
||||
TEST_THREAD_JOIN_TIMEOUT = 180 # Test theread join timeout in seconds
|
||||
|
||||
# Test definitions
|
||||
TEST_MASTER_RTU = 'master_rtu'
|
||||
TEST_SLAVE_RTU = 'slave_rtu'
|
||||
|
||||
TEST_MASTER_ASCII = 'master_ascii'
|
||||
TEST_SLAVE_ASCII = 'slave_ascii'
|
||||
|
||||
# Define tuple of strings to expect for each DUT.
|
||||
#
|
||||
master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...')
|
||||
slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.')
|
||||
|
||||
# The dictionary for expected values in listing
|
||||
expect_dict_master_ok = {'START': (),
|
||||
'READ_PAR_OK': (),
|
||||
'ALARM_MSG': (u'7',)}
|
||||
|
||||
expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'),
|
||||
'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')}
|
||||
|
||||
# The dictionary for regular expression patterns to check in listing
|
||||
pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'),
|
||||
'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+'
|
||||
r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'),
|
||||
'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')}
|
||||
|
||||
pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+'
|
||||
r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'),
|
||||
'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s'
|
||||
r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')}
|
||||
|
||||
# The dictionary for expected values in listing
|
||||
expect_dict_slave_ok = {'START': (),
|
||||
'READ_PAR_OK': (),
|
||||
'DESTROY': ()}
|
||||
|
||||
# The dictionary for regular expression patterns to check in listing
|
||||
pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'),
|
||||
'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s'
|
||||
r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
|
||||
'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')}
|
||||
|
||||
logger = logging.getLogger(LOGGER_NAME)
|
||||
|
||||
|
||||
class DutTestThread(Thread):
|
||||
def __init__(self, dut=None, name=None, expect=None):
|
||||
""" Initialize the thread parameters
|
||||
"""
|
||||
self.tname = name
|
||||
self.dut = dut
|
||||
self.expected = expect
|
||||
self.result = False
|
||||
self.data = None
|
||||
super(DutTestThread, self).__init__()
|
||||
|
||||
def run(self):
|
||||
""" The function implements thread functionality
|
||||
"""
|
||||
# Must reset again as flashing during start_app will reset multiple times, causing unexpected results
|
||||
self.dut.reset()
|
||||
|
||||
# Capture output from the DUT
|
||||
self.dut.start_capture_raw_data()
|
||||
|
||||
# Check expected strings in the listing
|
||||
for string in self.expected:
|
||||
self.dut.expect(string, TEST_THREAD_EXPECT_TIMEOUT)
|
||||
|
||||
# Check DUT exceptions
|
||||
dut_exceptions = self.dut.get_exceptions()
|
||||
if 'Guru Meditation Error:' in dut_exceptions:
|
||||
raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions))
|
||||
|
||||
# Mark thread has run to completion without any exceptions
|
||||
self.data = self.dut.stop_capture_raw_data()
|
||||
self.result = True
|
||||
|
||||
|
||||
def test_filter_output(data=None, start_pattern=None, end_pattern=None):
|
||||
"""Use patters to filter output
|
||||
"""
|
||||
start_index = str(data).find(start_pattern)
|
||||
end_index = str(data).find(end_pattern)
|
||||
logger.debug('Listing start index= %d, end=%d' % (start_index, end_index))
|
||||
if start_index == -1 or end_index == -1:
|
||||
return data
|
||||
return data[start_index:end_index + len(end_pattern)]
|
||||
|
||||
|
||||
def test_expect_re(data, pattern):
|
||||
"""
|
||||
Check if re pattern is matched in data cache
|
||||
:param data: data to process
|
||||
:param pattern: compiled RegEx pattern
|
||||
:return: match groups if match succeed otherwise None
|
||||
"""
|
||||
ret = None
|
||||
if isinstance(pattern, type(u'')):
|
||||
pattern = pattern.encode('utf-8')
|
||||
regex = re.compile(pattern)
|
||||
if isinstance(data, type(u'')):
|
||||
data = data.encode('utf-8')
|
||||
match = regex.search(data)
|
||||
if match:
|
||||
ret = tuple(None if x is None else x.decode() for x in match.groups())
|
||||
index = match.end()
|
||||
else:
|
||||
index = None
|
||||
return ret, index
|
||||
|
||||
|
||||
def test_check_output(data=None, check_dict=None, expect_dict=None):
|
||||
""" Check output for the test
|
||||
Check log using regular expressions:
|
||||
"""
|
||||
global logger
|
||||
match_count = 0
|
||||
index = 0
|
||||
data_lines = data.splitlines()
|
||||
for key, pattern in check_dict.items():
|
||||
if key not in expect_dict:
|
||||
break
|
||||
# Check pattern in the each line
|
||||
for line in data_lines:
|
||||
group, index = test_expect_re(line, pattern)
|
||||
if index is not None:
|
||||
logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line))
|
||||
if expect_dict[key] == group:
|
||||
logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group)))
|
||||
match_count += 1
|
||||
return match_count
|
||||
|
||||
|
||||
def test_check_mode(dut=None, mode_str=None, value=None):
|
||||
""" Check communication mode for dut
|
||||
"""
|
||||
global logger
|
||||
try:
|
||||
opt = dut.app.get_sdkconfig()[mode_str]
|
||||
logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt))
|
||||
return value == opt
|
||||
except Exception:
|
||||
logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
|
||||
return False
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_T2_RS485', target=['esp32'])
|
||||
def test_modbus_serial_communication(env, comm_mode):
|
||||
global logger
|
||||
|
||||
# Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver
|
||||
dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT)
|
||||
dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT)
|
||||
|
||||
try:
|
||||
logger.debug('Environment vars: %s\r\n' % os.environ)
|
||||
logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig())
|
||||
logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig())
|
||||
|
||||
# Check Kconfig configuration options for each built example
|
||||
if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'):
|
||||
logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n')
|
||||
slave_name = TEST_SLAVE_ASCII
|
||||
master_name = TEST_MASTER_ASCII
|
||||
elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'):
|
||||
logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n')
|
||||
slave_name = TEST_SLAVE_RTU
|
||||
master_name = TEST_MASTER_RTU
|
||||
else:
|
||||
logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
|
||||
raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
|
||||
# Check if slave address for example application is default one to be able to communicate
|
||||
if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'):
|
||||
logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
|
||||
raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
|
||||
|
||||
# Flash app onto each DUT
|
||||
dut_master.start_app()
|
||||
dut_slave.start_app()
|
||||
|
||||
# Create thread for each dut
|
||||
dut_master_thread = DutTestThread(dut=dut_master, name=master_name, expect=master_expect)
|
||||
dut_slave_thread = DutTestThread(dut=dut_slave, name=slave_name, expect=slave_expect)
|
||||
|
||||
# Start each thread
|
||||
dut_slave_thread.start()
|
||||
dut_master_thread.start()
|
||||
|
||||
# Wait for threads to complete
|
||||
dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
|
||||
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
|
||||
|
||||
if dut_slave_thread.isAlive():
|
||||
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||
raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||
|
||||
if dut_master_thread.isAlive():
|
||||
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||
raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||
finally:
|
||||
dut_master.close()
|
||||
dut_slave.close()
|
||||
|
||||
# Check if test threads completed successfully and captured data
|
||||
if not dut_slave_thread.result or dut_slave_thread.data is None:
|
||||
logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
|
||||
raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname)
|
||||
|
||||
if not dut_master_thread.result or dut_master_thread.data is None:
|
||||
logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
|
||||
raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname)
|
||||
|
||||
# Filter output to get test messages
|
||||
master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1])
|
||||
if master_output is not None:
|
||||
logger.info('The data for master thread is captured.')
|
||||
logger.debug(master_output)
|
||||
|
||||
slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1])
|
||||
if slave_output is not None:
|
||||
logger.info('The data for slave thread is captured.')
|
||||
logger.debug(slave_output)
|
||||
|
||||
# Check if parameters are read correctly by master
|
||||
match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok)
|
||||
if match_count < TEST_READ_MIN_COUNT:
|
||||
logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
|
||||
raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
|
||||
logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
|
||||
|
||||
# If the test completed successfully (alarm triggered) but there are some errors during reading of parameters
|
||||
match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err)
|
||||
if match_count > TEST_READ_MAX_ERR_COUNT:
|
||||
logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
|
||||
raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
|
||||
logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
|
||||
|
||||
match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok)
|
||||
if match_count < TEST_READ_MIN_COUNT:
|
||||
logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
|
||||
raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
|
||||
logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logger = logging.getLogger(LOGGER_NAME)
|
||||
# create file handler which logs even debug messages
|
||||
fh = logging.FileHandler('modbus_test.log')
|
||||
fh.setLevel(logging.DEBUG)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
# create console handler
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(logging.INFO)
|
||||
# set format of output for both handlers
|
||||
formatter = logging.Formatter('%(levelname)s:%(message)s')
|
||||
ch.setFormatter(formatter)
|
||||
fh.setFormatter(formatter)
|
||||
logger.addHandler(fh)
|
||||
logger.addHandler(ch)
|
||||
logger.info('Start script %s.' % os.path.basename(__file__))
|
||||
test_modbus_serial_communication()
|
||||
logging.shutdown()
|
@ -1,308 +0,0 @@
|
||||
# SPDX-FileCopyrightText: 2016-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from threading import Thread
|
||||
|
||||
import ttfw_idf
|
||||
from tiny_test_fw import DUT
|
||||
|
||||
LOG_LEVEL = logging.DEBUG
|
||||
LOGGER_NAME = 'modbus_test'
|
||||
|
||||
# Allowed options for the test
|
||||
TEST_READ_MAX_ERR_COUNT = 3 # Maximum allowed read errors during initialization
|
||||
TEST_THREAD_JOIN_TIMEOUT = 60 # Test theread join timeout in seconds
|
||||
TEST_EXPECT_STR_TIMEOUT = 30 # Test expect timeout in seconds
|
||||
TEST_MASTER_TCP = 'mb_tcp_master'
|
||||
TEST_SLAVE_TCP = 'mb_tcp_slave'
|
||||
|
||||
STACK_DEFAULT = 0
|
||||
STACK_IPV4 = 1
|
||||
STACK_IPV6 = 2
|
||||
STACK_INIT = 3
|
||||
STACK_CONNECT = 4
|
||||
STACK_START = 5
|
||||
STACK_PAR_OK = 6
|
||||
STACK_PAR_FAIL = 7
|
||||
STACK_DESTROY = 8
|
||||
|
||||
pattern_dict_slave = {STACK_IPV4: (r'.*I \([0-9]+\) example_common: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
|
||||
STACK_IPV6: (r'.*I \([0-9]+\) example_common: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'),
|
||||
STACK_INIT: (r'.*I \(([0-9]+)\) MB_TCP_SLAVE_PORT: (Protocol stack initialized).'),
|
||||
STACK_CONNECT: (r'.*I\s\(([0-9]+)\) MB_TCP_SLAVE_PORT: Socket \(#[0-9]+\), accept client connection from address: '
|
||||
r'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
|
||||
STACK_START: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Start modbus test).*'),
|
||||
STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: ([A-Z]+ [A-Z]+) \([a-zA-Z0-9_]+ us\),\s'
|
||||
r'ADDR:([0-9]+), TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
|
||||
STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) SLAVE_TEST: Response time exceeds configured [0-9]+ [ms], ignore packet.*'),
|
||||
STACK_DESTROY: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Modbus controller destroyed).')}
|
||||
|
||||
pattern_dict_master = {STACK_IPV4: (r'.*I \([0-9]+\) example_common: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
|
||||
STACK_IPV6: (r'.*I \([0-9]+\) example_common: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'),
|
||||
STACK_INIT: (r'.*I \(([0-9]+)\) MASTER_TEST: (Modbus master stack initialized).*'),
|
||||
STACK_CONNECT: (r'.*.*I\s\(([0-9]+)\) MB_TCP_MASTER_PORT: (Connected [0-9]+ slaves), start polling.*'),
|
||||
STACK_START: (r'.*I \(([0-9]+)\) MASTER_TEST: (Start modbus test).*'),
|
||||
STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+ ([a-zA-Z0-9_]+)'
|
||||
r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.]+ \(0x[a-zA-Z0-9]+\) read successful.*'),
|
||||
STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+\s\(([a-zA-Z0-9_]+)\)\s'
|
||||
r'read fail, err = [0-9]+ \([_A-Z]+\).*'),
|
||||
STACK_DESTROY: (r'.*I\s\(([0-9]+)\) MASTER_TEST: (Destroy master).*')}
|
||||
|
||||
logger = logging.getLogger(LOGGER_NAME)
|
||||
|
||||
|
||||
class DutTestThread(Thread):
|
||||
""" Test thread class
|
||||
"""
|
||||
def __init__(self, dut=None, name=None, ip_addr=None, expect=None):
|
||||
""" Initialize the thread parameters
|
||||
"""
|
||||
self.tname = name
|
||||
self.dut = dut
|
||||
self.expected = expect
|
||||
self.data = None
|
||||
self.ip_addr = ip_addr
|
||||
self.test_finish = False
|
||||
self.param_fail_count = 0
|
||||
self.param_ok_count = 0
|
||||
self.test_stage = STACK_DEFAULT
|
||||
super(DutTestThread, self).__init__()
|
||||
|
||||
def __enter__(self):
|
||||
logger.debug('Restart %s.', self.tname)
|
||||
# Reset DUT first
|
||||
self.dut.reset()
|
||||
# Capture output from the DUT
|
||||
self.dut.start_capture_raw_data(capture_id=self.dut.name)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
""" The exit method of context manager
|
||||
"""
|
||||
if exc_type is not None or exc_value is not None:
|
||||
logger.info('Thread %s rised an exception type: %s, value: %s', self.tname, str(exc_type), str(exc_value))
|
||||
|
||||
def run(self):
|
||||
""" The function implements thread functionality
|
||||
"""
|
||||
# Initialize slave IP for master board
|
||||
if (self.ip_addr is not None):
|
||||
self.set_ip(0)
|
||||
|
||||
# Check expected strings in the listing
|
||||
self.test_start(TEST_EXPECT_STR_TIMEOUT)
|
||||
|
||||
# Check DUT exceptions
|
||||
dut_exceptions = self.dut.get_exceptions()
|
||||
if 'Guru Meditation Error:' in dut_exceptions:
|
||||
raise RuntimeError('%s generated an exception(s): %s\n' % (str(self.dut), dut_exceptions))
|
||||
|
||||
# Mark thread has run to completion without any exceptions
|
||||
self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name)
|
||||
|
||||
def set_ip(self, index=0):
|
||||
""" The method to send slave IP to master application
|
||||
"""
|
||||
message = r'.*Waiting IP([0-9]{1,2}) from stdin.*'
|
||||
# Read all data from previous restart to get prompt correctly
|
||||
self.dut.read()
|
||||
result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
|
||||
if int(result[0]) != index:
|
||||
raise RuntimeError('Incorrect index of IP=%s for %s\n' % (result[0], str(self.dut)))
|
||||
# Use the same slave IP address for all characteristics during the test
|
||||
self.dut.write('IP0=' + self.ip_addr, '\n', False)
|
||||
self.dut.write('IP1=' + self.ip_addr, '\n', False)
|
||||
self.dut.write('IP2=' + self.ip_addr, '\n', False)
|
||||
logger.debug('Set IP address=%s for %s', self.ip_addr, self.tname)
|
||||
message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*'
|
||||
result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
|
||||
logger.debug('Thread %s initialized with slave IP=%s.', self.tname, self.ip_addr)
|
||||
|
||||
def test_start(self, timeout_value):
|
||||
""" The method to initialize and handle test stages
|
||||
"""
|
||||
def handle_get_ip4(data):
|
||||
""" Handle get_ip v4
|
||||
"""
|
||||
logger.debug('%s[STACK_IPV4]: %s', self.tname, str(data))
|
||||
self.test_stage = STACK_IPV4
|
||||
|
||||
def handle_get_ip6(data):
|
||||
""" Handle get_ip v6
|
||||
"""
|
||||
logger.debug('%s[STACK_IPV6]: %s', self.tname, str(data))
|
||||
self.test_stage = STACK_IPV6
|
||||
|
||||
def handle_init(data):
|
||||
""" Handle init
|
||||
"""
|
||||
logger.debug('%s[STACK_INIT]: %s', self.tname, str(data))
|
||||
self.test_stage = STACK_INIT
|
||||
|
||||
def handle_connect(data):
|
||||
""" Handle connect
|
||||
"""
|
||||
logger.debug('%s[STACK_CONNECT]: %s', self.tname, str(data))
|
||||
self.test_stage = STACK_CONNECT
|
||||
|
||||
def handle_test_start(data):
|
||||
""" Handle connect
|
||||
"""
|
||||
logger.debug('%s[STACK_START]: %s', self.tname, str(data))
|
||||
self.test_stage = STACK_START
|
||||
|
||||
def handle_par_ok(data):
|
||||
""" Handle parameter ok
|
||||
"""
|
||||
logger.debug('%s[READ_PAR_OK]: %s', self.tname, str(data))
|
||||
if self.test_stage >= STACK_START:
|
||||
self.param_ok_count += 1
|
||||
self.test_stage = STACK_PAR_OK
|
||||
|
||||
def handle_par_fail(data):
|
||||
""" Handle parameter fail
|
||||
"""
|
||||
logger.debug('%s[READ_PAR_FAIL]: %s', self.tname, str(data))
|
||||
self.param_fail_count += 1
|
||||
self.test_stage = STACK_PAR_FAIL
|
||||
|
||||
def handle_destroy(data):
|
||||
""" Handle destroy
|
||||
"""
|
||||
logger.debug('%s[DESTROY]: %s', self.tname, str(data))
|
||||
self.test_stage = STACK_DESTROY
|
||||
self.test_finish = True
|
||||
|
||||
while not self.test_finish:
|
||||
try:
|
||||
self.dut.expect_any((re.compile(self.expected[STACK_IPV4]), handle_get_ip4),
|
||||
(re.compile(self.expected[STACK_IPV6]), handle_get_ip6),
|
||||
(re.compile(self.expected[STACK_INIT]), handle_init),
|
||||
(re.compile(self.expected[STACK_CONNECT]), handle_connect),
|
||||
(re.compile(self.expected[STACK_START]), handle_test_start),
|
||||
(re.compile(self.expected[STACK_PAR_OK]), handle_par_ok),
|
||||
(re.compile(self.expected[STACK_PAR_FAIL]), handle_par_fail),
|
||||
(re.compile(self.expected[STACK_DESTROY]), handle_destroy),
|
||||
timeout=timeout_value)
|
||||
except DUT.ExpectTimeout:
|
||||
logger.debug('%s, expect timeout on stage #%d (%s seconds)', self.tname, self.test_stage, timeout_value)
|
||||
self.test_finish = True
|
||||
|
||||
|
||||
def test_check_mode(dut=None, mode_str=None, value=None):
|
||||
""" Check communication mode for dut
|
||||
"""
|
||||
global logger
|
||||
try:
|
||||
opt = dut.app.get_sdkconfig()[mode_str]
|
||||
logger.debug('%s {%s} = %s.\n', str(dut), mode_str, opt)
|
||||
return value == opt
|
||||
except Exception:
|
||||
logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.', str(dut), mode_str)
|
||||
return False
|
||||
|
||||
|
||||
@ttfw_idf.idf_example_test(env_tag='Example_Modbus_TCP', target=['esp32'])
|
||||
def test_modbus_tcp_communication(env, comm_mode):
|
||||
global logger
|
||||
|
||||
rel_project_path = os.path.join('examples', 'protocols', 'modbus', 'tcp')
|
||||
# Get device under test. Both duts must be able to be connected to WiFi router
|
||||
dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP))
|
||||
dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP))
|
||||
log_file = os.path.join(env.log_path, 'modbus_tcp_test.log')
|
||||
print('Logging file name: %s' % log_file)
|
||||
|
||||
try:
|
||||
# create file handler which logs even debug messages
|
||||
logger.setLevel(logging.DEBUG)
|
||||
fh = logging.FileHandler(log_file)
|
||||
fh.setLevel(logging.DEBUG)
|
||||
# set format of output for both handlers
|
||||
formatter = logging.Formatter('%(levelname)s:%(message)s')
|
||||
fh.setFormatter(formatter)
|
||||
logger.addHandler(fh)
|
||||
# create console handler
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(logging.INFO)
|
||||
# set format of output for both handlers
|
||||
formatter = logging.Formatter('%(levelname)s:%(message)s')
|
||||
ch.setFormatter(formatter)
|
||||
logger.addHandler(ch)
|
||||
|
||||
# Check Kconfig configuration options for each built example
|
||||
if (test_check_mode(dut_master, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y') and
|
||||
test_check_mode(dut_slave, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y')):
|
||||
slave_name = TEST_SLAVE_TCP
|
||||
master_name = TEST_MASTER_TCP
|
||||
else:
|
||||
logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
|
||||
raise RuntimeError('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
|
||||
address = None
|
||||
if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'):
|
||||
logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n')
|
||||
# Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason
|
||||
dut_slave.start_app()
|
||||
dut_master.start_app()
|
||||
if test_check_mode(dut_master, 'CONFIG_EXAMPLE_CONNECT_IPV6', 'y'):
|
||||
address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT)
|
||||
else:
|
||||
address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT)
|
||||
if address is not None:
|
||||
print('Found IP slave address: %s' % address[0])
|
||||
else:
|
||||
raise RuntimeError('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n')
|
||||
else:
|
||||
raise RuntimeError('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n')
|
||||
|
||||
# Create thread for each dut
|
||||
with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread:
|
||||
with DutTestThread(dut=dut_slave, name=slave_name, ip_addr=None, expect=pattern_dict_slave) as dut_slave_thread:
|
||||
|
||||
# Start each thread
|
||||
dut_slave_thread.start()
|
||||
dut_master_thread.start()
|
||||
|
||||
# Wait for threads to complete
|
||||
dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
|
||||
dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
|
||||
|
||||
if dut_slave_thread.is_alive():
|
||||
logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
|
||||
dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
|
||||
raise RuntimeError('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||
(dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||
|
||||
if dut_master_thread.is_alive():
|
||||
logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
|
||||
dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
|
||||
raise RuntimeError('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
|
||||
(dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
|
||||
|
||||
logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n',
|
||||
dut_master_thread.tname, dut_master_thread.param_fail_count,
|
||||
dut_slave_thread.tname, dut_slave_thread.param_fail_count)
|
||||
logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n',
|
||||
dut_master_thread.tname, dut_master_thread.param_ok_count,
|
||||
dut_slave_thread.tname, dut_slave_thread.param_ok_count)
|
||||
|
||||
if ((dut_master_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
|
||||
(dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
|
||||
(dut_slave_thread.param_ok_count == 0) or
|
||||
(dut_master_thread.param_ok_count == 0)):
|
||||
raise RuntimeError('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' %
|
||||
(dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count,
|
||||
dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count))
|
||||
logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n')
|
||||
|
||||
finally:
|
||||
dut_master.close()
|
||||
dut_slave.close()
|
||||
logging.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_modbus_tcp_communication()
|
Loading…
Reference in New Issue
Block a user