# Need Python 3 string formatting functions from __future__ import print_function import logging import os import re from threading import Thread import ttfw_idf LOG_LEVEL = logging.DEBUG LOGGER_NAME = 'modbus_test' # Allowed parameter reads TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings TEST_READ_MAX_ERR_COUNT = 2 # Maximum allowed read errors during initialization TEST_THREAD_EXPECT_TIMEOUT = 120 # Test theread expect timeout in seconds TEST_THREAD_JOIN_TIMEOUT = 180 # Test theread join timeout in seconds # Test definitions TEST_MASTER_RTU = 'master_rtu' TEST_SLAVE_RTU = 'slave_rtu' TEST_MASTER_ASCII = 'master_ascii' TEST_SLAVE_ASCII = 'slave_ascii' # Define tuple of strings to expect for each DUT. # master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...') slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.') # The dictionary for expected values in listing expect_dict_master_ok = {'START': (), 'READ_PAR_OK': (), 'ALARM_MSG': (u'7',)} expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'), 'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')} # The dictionary for regular expression patterns to check in listing pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'), 'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+' r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'), 'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')} pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+' r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'), 'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s' r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')} # The dictionary for expected values in listing expect_dict_slave_ok = {'START': (), 'READ_PAR_OK': (), 'DESTROY': ()} # The dictionary for regular expression patterns to check in listing pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'), 'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s' r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'), 'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')} logger = logging.getLogger(LOGGER_NAME) class DutTestThread(Thread): def __init__(self, dut=None, name=None, expect=None): """ Initialize the thread parameters """ self.tname = name self.dut = dut self.expected = expect self.result = False self.data = None super(DutTestThread, self).__init__() def run(self): """ The function implements thread functionality """ # Must reset again as flashing during start_app will reset multiple times, causing unexpected results self.dut.reset() # Capture output from the DUT self.dut.start_capture_raw_data() # Check expected strings in the listing for string in self.expected: self.dut.expect(string, TEST_THREAD_EXPECT_TIMEOUT) # Check DUT exceptions dut_exceptions = self.dut.get_exceptions() if 'Guru Meditation Error:' in dut_exceptions: raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions)) # Mark thread has run to completion without any exceptions self.data = self.dut.stop_capture_raw_data() self.result = True def test_filter_output(data=None, start_pattern=None, end_pattern=None): """Use patters to filter output """ start_index = str(data).find(start_pattern) end_index = str(data).find(end_pattern) logger.debug('Listing start index= %d, end=%d' % (start_index, end_index)) if start_index == -1 or end_index == -1: return data return data[start_index:end_index + len(end_pattern)] def test_expect_re(data, pattern): """ Check if re pattern is matched in data cache :param data: data to process :param pattern: compiled RegEx pattern :return: match groups if match succeed otherwise None """ ret = None if isinstance(pattern, type(u'')): pattern = pattern.encode('utf-8') regex = re.compile(pattern) if isinstance(data, type(u'')): data = data.encode('utf-8') match = regex.search(data) if match: ret = tuple(None if x is None else x.decode() for x in match.groups()) index = match.end() else: index = None return ret, index def test_check_output(data=None, check_dict=None, expect_dict=None): """ Check output for the test Check log using regular expressions: """ global logger match_count = 0 index = 0 data_lines = data.splitlines() for key, pattern in check_dict.items(): if key not in expect_dict: break # Check pattern in the each line for line in data_lines: group, index = test_expect_re(line, pattern) if index is not None: logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line)) if expect_dict[key] == group: logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group))) match_count += 1 return match_count def test_check_mode(dut=None, mode_str=None, value=None): """ Check communication mode for dut """ global logger try: opt = dut.app.get_sdkconfig()[mode_str] logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt)) return value == opt except Exception: logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str)) return False @ttfw_idf.idf_example_test(env_tag='Example_T2_RS485', target=['esp32']) def test_modbus_communication(env, comm_mode): global logger # Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT) dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT) try: logger.debug('Environment vars: %s\r\n' % os.environ) logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig()) logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig()) # Check Kconfig configuration options for each built example if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'): logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n') slave_name = TEST_SLAVE_ASCII master_name = TEST_MASTER_ASCII elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'): logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n') slave_name = TEST_SLAVE_RTU master_name = TEST_MASTER_RTU else: logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n") raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n") # Check if slave address for example application is default one to be able to communicate if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'): logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n') raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n') # Flash app onto each DUT dut_master.start_app() dut_slave.start_app() # Create thread for each dut dut_master_thread = DutTestThread(dut=dut_master, name=master_name, expect=master_expect) dut_slave_thread = DutTestThread(dut=dut_slave, name=slave_name, expect=slave_expect) # Start each thread dut_slave_thread.start() dut_master_thread.start() # Wait for threads to complete dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT) if dut_slave_thread.isAlive(): logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) if dut_master_thread.isAlive(): logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' % (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)) finally: dut_master.close() dut_slave.close() # Check if test threads completed successfully and captured data if not dut_slave_thread.result or dut_slave_thread.data is None: logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname) raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname) if not dut_master_thread.result or dut_master_thread.data is None: logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname) raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname) # Filter output to get test messages master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1]) if master_output is not None: logger.info('The data for master thread is captured.') logger.debug(master_output) slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1]) if slave_output is not None: logger.info('The data for slave thread is captured.') logger.debug(slave_output) # Check if parameters are read correctly by master match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok) if match_count < TEST_READ_MIN_COUNT: logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count)) # If the test completed successfully (alarm triggered) but there are some errors during reading of parameters match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err) if match_count > TEST_READ_MAX_ERR_COUNT: logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count)) logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count)) match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok) if match_count < TEST_READ_MIN_COUNT: logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count)) raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count)) logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count)) if __name__ == '__main__': logger = logging.getLogger(LOGGER_NAME) # create file handler which logs even debug messages fh = logging.FileHandler('modbus_test.log') fh.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG) # create console handler ch = logging.StreamHandler() ch.setLevel(logging.INFO) # set format of output for both handlers formatter = logging.Formatter('%(levelname)s:%(message)s') ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) logger.info('Start script %s.' % os.path.basename(__file__)) test_modbus_communication() logging.shutdown()