From be59a273c6e037669fbd2160a9f356c9ce7745d7 Mon Sep 17 00:00:00 2001 From: aleks Date: Mon, 20 Feb 2023 11:06:27 +0100 Subject: [PATCH] modbus remove example tests --- .gitlab/ci/target-test.yml | 12 - .../protocols/modbus/serial/example_test.py | 290 ----------------- examples/protocols/modbus/tcp/example_test.py | 308 ------------------ 3 files changed, 610 deletions(-) delete mode 100644 examples/protocols/modbus/serial/example_test.py delete mode 100644 examples/protocols/modbus/tcp/example_test.py diff --git a/.gitlab/ci/target-test.yml b/.gitlab/ci/target-test.yml index 3efc70e699..349a30da7b 100644 --- a/.gitlab/ci/target-test.yml +++ b/.gitlab/ci/target-test.yml @@ -679,18 +679,6 @@ example_test_009: SETUP_TOOLS: "1" PYTHON_VER: 3 -example_test_011: - extends: .example_test_esp32_template - tags: - - ESP32 - - Example_T2_RS485 - -example_test_016: - extends: .example_test_esp32_template - tags: - - ESP32 - - Example_Modbus_TCP - example_test_017: extends: .example_test_esp32s2_template tags: diff --git a/examples/protocols/modbus/serial/example_test.py b/examples/protocols/modbus/serial/example_test.py deleted file mode 100644 index 4a8b15dec6..0000000000 --- a/examples/protocols/modbus/serial/example_test.py +++ /dev/null @@ -1,290 +0,0 @@ -# SPDX-FileCopyrightText: 2016-2021 Espressif Systems (Shanghai) CO LTD -# SPDX-License-Identifier: Apache-2.0 - -# Need Python 3 string formatting functions -from __future__ import print_function - -import logging -import os -import re -from threading import Thread - -import ttfw_idf - -LOG_LEVEL = logging.DEBUG -LOGGER_NAME = 'modbus_test' - -# Allowed parameter reads -TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings -TEST_READ_MAX_ERR_COUNT = 2 # Maximum allowed read errors during initialization - -TEST_THREAD_EXPECT_TIMEOUT = 120 # Test theread expect timeout in seconds -TEST_THREAD_JOIN_TIMEOUT = 180 # Test theread join timeout in seconds - -# Test definitions -TEST_MASTER_RTU = 'master_rtu' -TEST_SLAVE_RTU = 'slave_rtu' - -TEST_MASTER_ASCII = 'master_ascii' -TEST_SLAVE_ASCII = 'slave_ascii' - -# Define tuple of strings to expect for each DUT. -# -master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...') -slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.') - -# The dictionary for expected values in listing -expect_dict_master_ok = {'START': (), - 'READ_PAR_OK': (), - 'ALARM_MSG': (u'7',)} - -expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'), - 'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')} - -# The dictionary for regular expression patterns to check in listing -pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'), - 'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+' - r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'), - 'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')} - -pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+' - r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'), - 'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s' - r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')} - -# The dictionary for expected values in listing -expect_dict_slave_ok = {'START': (), - 'READ_PAR_OK': (), - 'DESTROY': ()} - -# The dictionary for regular expression patterns to check in listing -pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'), - 'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s' - r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'), - 'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')} - -logger = logging.getLogger(LOGGER_NAME) - - -class DutTestThread(Thread): - def __init__(self, dut=None, name=None, expect=None): - """ Initialize the thread parameters - """ - self.tname = name - self.dut = dut - self.expected = expect - self.result = False - self.data = None - super(DutTestThread, self).__init__() - - def run(self): - """ The function implements thread functionality - """ - # Must reset again as flashing during start_app will reset multiple times, causing unexpected results - self.dut.reset() - - # Capture output from the DUT - self.dut.start_capture_raw_data() - - # Check expected strings in the listing - for string in self.expected: - self.dut.expect(string, TEST_THREAD_EXPECT_TIMEOUT) - - # Check DUT exceptions - dut_exceptions = self.dut.get_exceptions() - if 'Guru Meditation Error:' in dut_exceptions: - raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions)) - - # Mark thread has run to completion without any exceptions - self.data = self.dut.stop_capture_raw_data() - self.result = True - - -def test_filter_output(data=None, start_pattern=None, end_pattern=None): - """Use patters to filter output - """ - start_index = str(data).find(start_pattern) - end_index = str(data).find(end_pattern) - logger.debug('Listing start index= %d, end=%d' % (start_index, end_index)) - if start_index == -1 or end_index == -1: - return data - return data[start_index:end_index + len(end_pattern)] - - -def test_expect_re(data, pattern): - """ - Check if re pattern is matched in data cache - :param data: data to process - :param pattern: compiled RegEx pattern - :return: match groups if match succeed otherwise None - """ - ret = None - if isinstance(pattern, type(u'')): - pattern = pattern.encode('utf-8') - regex = re.compile(pattern) - if isinstance(data, type(u'')): - data = data.encode('utf-8') - match = regex.search(data) - if match: - ret = tuple(None if x is None else x.decode() for x in match.groups()) - index = match.end() - else: - index = None - return ret, index - - -def test_check_output(data=None, check_dict=None, expect_dict=None): - """ Check output for the test - Check log using regular expressions: - """ - global logger - match_count = 0 - index = 0 - data_lines = data.splitlines() - for key, pattern in check_dict.items(): - if key not in expect_dict: - break - # Check pattern in the each line - for line in data_lines: - group, index = test_expect_re(line, pattern) - if index is not None: - logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line)) - if expect_dict[key] == group: - logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group))) - match_count += 1 - return match_count - - -def test_check_mode(dut=None, mode_str=None, value=None): - """ Check communication mode for dut - """ - global logger - try: - opt = dut.app.get_sdkconfig()[mode_str] - logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt)) - return value == opt - except Exception: - logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str)) - return False - - -@ttfw_idf.idf_example_test(env_tag='Example_T2_RS485', target=['esp32']) -def test_modbus_serial_communication(env, comm_mode): - global logger - - # Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver - dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT) - dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT) - - try: - logger.debug('Environment vars: %s\r\n' % os.environ) - logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig()) - logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig()) - - # Check Kconfig configuration options for each built example - if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'): - logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n') - slave_name = TEST_SLAVE_ASCII - master_name = TEST_MASTER_ASCII - elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'): - logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n') - slave_name = TEST_SLAVE_RTU - master_name = TEST_MASTER_RTU - else: - logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n") - raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n") - # Check if slave address for example application is default one to be able to communicate - if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'): - logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n') - raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n') - - # Flash app onto each DUT - dut_master.start_app() - dut_slave.start_app() - - # Create thread for each dut - dut_master_thread = DutTestThread(dut=dut_master, name=master_name, expect=master_expect) - dut_slave_thread = DutTestThread(dut=dut_slave, name=slave_name, expect=slave_expect) - - # Start each thread - dut_slave_thread.start() - dut_master_thread.start() - - # Wait for threads to complete - dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) - dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) - - if dut_slave_thread.isAlive(): - logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % - (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % - (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - - if dut_master_thread.isAlive(): - logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % - (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % - (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - finally: - dut_master.close() - dut_slave.close() - - # Check if test threads completed successfully and captured data - if not dut_slave_thread.result or dut_slave_thread.data is None: - logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname) - raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname) - - if not dut_master_thread.result or dut_master_thread.data is None: - logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname) - raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname) - - # Filter output to get test messages - master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1]) - if master_output is not None: - logger.info('The data for master thread is captured.') - logger.debug(master_output) - - slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1]) - if slave_output is not None: - logger.info('The data for slave thread is captured.') - logger.debug(slave_output) - - # Check if parameters are read correctly by master - match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok) - if match_count < TEST_READ_MIN_COUNT: - logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) - raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) - logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count)) - - # If the test completed successfully (alarm triggered) but there are some errors during reading of parameters - match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err) - if match_count > TEST_READ_MAX_ERR_COUNT: - logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) - raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) - logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count)) - - match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok) - if match_count < TEST_READ_MIN_COUNT: - logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count)) - raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count)) - logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count)) - - -if __name__ == '__main__': - logger = logging.getLogger(LOGGER_NAME) - # create file handler which logs even debug messages - fh = logging.FileHandler('modbus_test.log') - fh.setLevel(logging.DEBUG) - logger.setLevel(logging.DEBUG) - # create console handler - ch = logging.StreamHandler() - ch.setLevel(logging.INFO) - # set format of output for both handlers - formatter = logging.Formatter('%(levelname)s:%(message)s') - ch.setFormatter(formatter) - fh.setFormatter(formatter) - logger.addHandler(fh) - logger.addHandler(ch) - logger.info('Start script %s.' % os.path.basename(__file__)) - test_modbus_serial_communication() - logging.shutdown() diff --git a/examples/protocols/modbus/tcp/example_test.py b/examples/protocols/modbus/tcp/example_test.py deleted file mode 100644 index f3fa9ceddc..0000000000 --- a/examples/protocols/modbus/tcp/example_test.py +++ /dev/null @@ -1,308 +0,0 @@ -# SPDX-FileCopyrightText: 2016-2022 Espressif Systems (Shanghai) CO LTD -# SPDX-License-Identifier: Apache-2.0 - -import logging -import os -import re -from threading import Thread - -import ttfw_idf -from tiny_test_fw import DUT - -LOG_LEVEL = logging.DEBUG -LOGGER_NAME = 'modbus_test' - -# Allowed options for the test -TEST_READ_MAX_ERR_COUNT = 3 # Maximum allowed read errors during initialization -TEST_THREAD_JOIN_TIMEOUT = 60 # Test theread join timeout in seconds -TEST_EXPECT_STR_TIMEOUT = 30 # Test expect timeout in seconds -TEST_MASTER_TCP = 'mb_tcp_master' -TEST_SLAVE_TCP = 'mb_tcp_slave' - -STACK_DEFAULT = 0 -STACK_IPV4 = 1 -STACK_IPV6 = 2 -STACK_INIT = 3 -STACK_CONNECT = 4 -STACK_START = 5 -STACK_PAR_OK = 6 -STACK_PAR_FAIL = 7 -STACK_DESTROY = 8 - -pattern_dict_slave = {STACK_IPV4: (r'.*I \([0-9]+\) example_common: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'), - STACK_IPV6: (r'.*I \([0-9]+\) example_common: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'), - STACK_INIT: (r'.*I \(([0-9]+)\) MB_TCP_SLAVE_PORT: (Protocol stack initialized).'), - STACK_CONNECT: (r'.*I\s\(([0-9]+)\) MB_TCP_SLAVE_PORT: Socket \(#[0-9]+\), accept client connection from address: ' - r'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'), - STACK_START: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Start modbus test).*'), - STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: ([A-Z]+ [A-Z]+) \([a-zA-Z0-9_]+ us\),\s' - r'ADDR:([0-9]+), TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'), - STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) SLAVE_TEST: Response time exceeds configured [0-9]+ [ms], ignore packet.*'), - STACK_DESTROY: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Modbus controller destroyed).')} - -pattern_dict_master = {STACK_IPV4: (r'.*I \([0-9]+\) example_common: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'), - STACK_IPV6: (r'.*I \([0-9]+\) example_common: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'), - STACK_INIT: (r'.*I \(([0-9]+)\) MASTER_TEST: (Modbus master stack initialized).*'), - STACK_CONNECT: (r'.*.*I\s\(([0-9]+)\) MB_TCP_MASTER_PORT: (Connected [0-9]+ slaves), start polling.*'), - STACK_START: (r'.*I \(([0-9]+)\) MASTER_TEST: (Start modbus test).*'), - STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+ ([a-zA-Z0-9_]+)' - r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.]+ \(0x[a-zA-Z0-9]+\) read successful.*'), - STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+\s\(([a-zA-Z0-9_]+)\)\s' - r'read fail, err = [0-9]+ \([_A-Z]+\).*'), - STACK_DESTROY: (r'.*I\s\(([0-9]+)\) MASTER_TEST: (Destroy master).*')} - -logger = logging.getLogger(LOGGER_NAME) - - -class DutTestThread(Thread): - """ Test thread class - """ - def __init__(self, dut=None, name=None, ip_addr=None, expect=None): - """ Initialize the thread parameters - """ - self.tname = name - self.dut = dut - self.expected = expect - self.data = None - self.ip_addr = ip_addr - self.test_finish = False - self.param_fail_count = 0 - self.param_ok_count = 0 - self.test_stage = STACK_DEFAULT - super(DutTestThread, self).__init__() - - def __enter__(self): - logger.debug('Restart %s.', self.tname) - # Reset DUT first - self.dut.reset() - # Capture output from the DUT - self.dut.start_capture_raw_data(capture_id=self.dut.name) - return self - - def __exit__(self, exc_type, exc_value, traceback): - """ The exit method of context manager - """ - if exc_type is not None or exc_value is not None: - logger.info('Thread %s rised an exception type: %s, value: %s', self.tname, str(exc_type), str(exc_value)) - - def run(self): - """ The function implements thread functionality - """ - # Initialize slave IP for master board - if (self.ip_addr is not None): - self.set_ip(0) - - # Check expected strings in the listing - self.test_start(TEST_EXPECT_STR_TIMEOUT) - - # Check DUT exceptions - dut_exceptions = self.dut.get_exceptions() - if 'Guru Meditation Error:' in dut_exceptions: - raise RuntimeError('%s generated an exception(s): %s\n' % (str(self.dut), dut_exceptions)) - - # Mark thread has run to completion without any exceptions - self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name) - - def set_ip(self, index=0): - """ The method to send slave IP to master application - """ - message = r'.*Waiting IP([0-9]{1,2}) from stdin.*' - # Read all data from previous restart to get prompt correctly - self.dut.read() - result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT) - if int(result[0]) != index: - raise RuntimeError('Incorrect index of IP=%s for %s\n' % (result[0], str(self.dut))) - # Use the same slave IP address for all characteristics during the test - self.dut.write('IP0=' + self.ip_addr, '\n', False) - self.dut.write('IP1=' + self.ip_addr, '\n', False) - self.dut.write('IP2=' + self.ip_addr, '\n', False) - logger.debug('Set IP address=%s for %s', self.ip_addr, self.tname) - message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*' - result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT) - logger.debug('Thread %s initialized with slave IP=%s.', self.tname, self.ip_addr) - - def test_start(self, timeout_value): - """ The method to initialize and handle test stages - """ - def handle_get_ip4(data): - """ Handle get_ip v4 - """ - logger.debug('%s[STACK_IPV4]: %s', self.tname, str(data)) - self.test_stage = STACK_IPV4 - - def handle_get_ip6(data): - """ Handle get_ip v6 - """ - logger.debug('%s[STACK_IPV6]: %s', self.tname, str(data)) - self.test_stage = STACK_IPV6 - - def handle_init(data): - """ Handle init - """ - logger.debug('%s[STACK_INIT]: %s', self.tname, str(data)) - self.test_stage = STACK_INIT - - def handle_connect(data): - """ Handle connect - """ - logger.debug('%s[STACK_CONNECT]: %s', self.tname, str(data)) - self.test_stage = STACK_CONNECT - - def handle_test_start(data): - """ Handle connect - """ - logger.debug('%s[STACK_START]: %s', self.tname, str(data)) - self.test_stage = STACK_START - - def handle_par_ok(data): - """ Handle parameter ok - """ - logger.debug('%s[READ_PAR_OK]: %s', self.tname, str(data)) - if self.test_stage >= STACK_START: - self.param_ok_count += 1 - self.test_stage = STACK_PAR_OK - - def handle_par_fail(data): - """ Handle parameter fail - """ - logger.debug('%s[READ_PAR_FAIL]: %s', self.tname, str(data)) - self.param_fail_count += 1 - self.test_stage = STACK_PAR_FAIL - - def handle_destroy(data): - """ Handle destroy - """ - logger.debug('%s[DESTROY]: %s', self.tname, str(data)) - self.test_stage = STACK_DESTROY - self.test_finish = True - - while not self.test_finish: - try: - self.dut.expect_any((re.compile(self.expected[STACK_IPV4]), handle_get_ip4), - (re.compile(self.expected[STACK_IPV6]), handle_get_ip6), - (re.compile(self.expected[STACK_INIT]), handle_init), - (re.compile(self.expected[STACK_CONNECT]), handle_connect), - (re.compile(self.expected[STACK_START]), handle_test_start), - (re.compile(self.expected[STACK_PAR_OK]), handle_par_ok), - (re.compile(self.expected[STACK_PAR_FAIL]), handle_par_fail), - (re.compile(self.expected[STACK_DESTROY]), handle_destroy), - timeout=timeout_value) - except DUT.ExpectTimeout: - logger.debug('%s, expect timeout on stage #%d (%s seconds)', self.tname, self.test_stage, timeout_value) - self.test_finish = True - - -def test_check_mode(dut=None, mode_str=None, value=None): - """ Check communication mode for dut - """ - global logger - try: - opt = dut.app.get_sdkconfig()[mode_str] - logger.debug('%s {%s} = %s.\n', str(dut), mode_str, opt) - return value == opt - except Exception: - logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.', str(dut), mode_str) - return False - - -@ttfw_idf.idf_example_test(env_tag='Example_Modbus_TCP', target=['esp32']) -def test_modbus_tcp_communication(env, comm_mode): - global logger - - rel_project_path = os.path.join('examples', 'protocols', 'modbus', 'tcp') - # Get device under test. Both duts must be able to be connected to WiFi router - dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP)) - dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP)) - log_file = os.path.join(env.log_path, 'modbus_tcp_test.log') - print('Logging file name: %s' % log_file) - - try: - # create file handler which logs even debug messages - logger.setLevel(logging.DEBUG) - fh = logging.FileHandler(log_file) - fh.setLevel(logging.DEBUG) - # set format of output for both handlers - formatter = logging.Formatter('%(levelname)s:%(message)s') - fh.setFormatter(formatter) - logger.addHandler(fh) - # create console handler - ch = logging.StreamHandler() - ch.setLevel(logging.INFO) - # set format of output for both handlers - formatter = logging.Formatter('%(levelname)s:%(message)s') - ch.setFormatter(formatter) - logger.addHandler(ch) - - # Check Kconfig configuration options for each built example - if (test_check_mode(dut_master, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y') and - test_check_mode(dut_slave, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y')): - slave_name = TEST_SLAVE_TCP - master_name = TEST_MASTER_TCP - else: - logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n') - raise RuntimeError('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n') - address = None - if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'): - logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n') - # Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason - dut_slave.start_app() - dut_master.start_app() - if test_check_mode(dut_master, 'CONFIG_EXAMPLE_CONNECT_IPV6', 'y'): - address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT) - else: - address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT) - if address is not None: - print('Found IP slave address: %s' % address[0]) - else: - raise RuntimeError('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n') - else: - raise RuntimeError('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n') - - # Create thread for each dut - with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread: - with DutTestThread(dut=dut_slave, name=slave_name, ip_addr=None, expect=pattern_dict_slave) as dut_slave_thread: - - # Start each thread - dut_slave_thread.start() - dut_master_thread.start() - - # Wait for threads to complete - dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) - dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) - - if dut_slave_thread.is_alive(): - logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n', - dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT) - raise RuntimeError('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % - (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - - if dut_master_thread.is_alive(): - logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n', - dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT) - raise RuntimeError('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % - (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) - - logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n', - dut_master_thread.tname, dut_master_thread.param_fail_count, - dut_slave_thread.tname, dut_slave_thread.param_fail_count) - logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n', - dut_master_thread.tname, dut_master_thread.param_ok_count, - dut_slave_thread.tname, dut_slave_thread.param_ok_count) - - if ((dut_master_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or - (dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or - (dut_slave_thread.param_ok_count == 0) or - (dut_master_thread.param_ok_count == 0)): - raise RuntimeError('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' % - (dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count, - dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count)) - logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n') - - finally: - dut_master.close() - dut_slave.close() - logging.shutdown() - - -if __name__ == '__main__': - test_modbus_tcp_communication()