#!/usr/bin/env python # # Copyright 2018 Espressif Systems (Shanghai) PTE LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Test script for unit test case. """ import argparse import re import threading import time import ttfw_idf from tiny_test_fw import DUT, Env, TinyFW, Utility from tiny_test_fw.TinyFW import TestCaseFailed from tiny_test_fw.Utility import format_case_id, handle_unexpected_exception UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests." STRIP_CONFIG_PATTERN = re.compile(r'(.+?)(_\d+)?$') # matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)" RESET_PATTERN = re.compile(r"(rst:0x[0-9a-fA-F]*\s\([\w].*?\),boot:0x[0-9a-fA-F]*\s\([\w].*?\))") EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))") ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)") FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored") END_LIST_STR = r'\r?\nEnter test for running' TEST_PATTERN = re.compile(r'\((\d+)\)\s+"([^"]+)" ([^\r\n]+)\r?\n(' + END_LIST_STR + r')?') TEST_SUBMENU_PATTERN = re.compile(r'\s+\((\d+)\)\s+"[^"]+"\r?\n(?=(?=\()|(' + END_LIST_STR + r'))') UT_APP_PATH = "tools/unit-test-app" SIMPLE_TEST_ID = 0 MULTI_STAGE_ID = 1 MULTI_DEVICE_ID = 2 DEFAULT_TIMEOUT = 20 DUT_DELAY_AFTER_RESET = 2 DUT_STARTUP_CHECK_RETRY_COUNT = 5 TEST_HISTORY_CHECK_TIMEOUT = 2 def reset_reason_matches(reported_str, expected_str): known_aliases = { "_RESET": "_RST", "POWERON_RESET": "POWERON", "DEEPSLEEP_RESET": "DSLEEP", } if expected_str in reported_str: return True for token, alias in known_aliases.items(): if token in expected_str: alt_expected_str = expected_str.replace(token, alias) if alt_expected_str in reported_str: return True return False def format_test_case_config(test_case_data, target='esp32'): """ convert the test case data to unified format. We need to following info to run unit test cases: 1. unit test app config 2. test case name 3. test case reset info the formatted case config is a dict, with ut app config as keys. The value is a list of test cases. Each test case is a dict with "name" and "reset" as keys. For example:: case_config = { "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}], "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}], } If config is not specified for test case, then :param test_case_data: string, list, or a dictionary list :param target: target :return: formatted data """ case_config = dict() def parse_case(one_case_data): """ parse and format one case """ def process_reset_list(reset_list): # strip space and remove white space only items _output = list() for _r in reset_list: _data = _r.strip(" ") if _data: _output.append(_data) return _output _case = dict() if isinstance(one_case_data, str): _temp = one_case_data.split(" [reset=") _case["name"] = _temp[0] try: _case["reset"] = process_reset_list(_temp[1][0:-1].split(",")) except IndexError: _case["reset"] = list() elif isinstance(one_case_data, dict): _case = one_case_data.copy() assert "name" in _case if "reset" not in _case: _case["reset"] = list() else: if isinstance(_case["reset"], str): _case["reset"] = process_reset_list(_case["reset"].split(",")) else: raise TypeError("Not supported type during parsing unit test case") if "config" not in _case: _case["config"] = "default" if 'target' not in _case: _case['target'] = target return _case if not isinstance(test_case_data, list): test_case_data = [test_case_data] for case_data in test_case_data: parsed_case = parse_case(case_data) try: case_config[parsed_case["config"]].append(parsed_case) except KeyError: case_config[parsed_case["config"]] = [parsed_case] return case_config def replace_app_bin(dut, name, new_app_bin): if new_app_bin is None: return search_pattern = '/{}.bin'.format(name) for i, config in enumerate(dut.download_config): if config.endswith(search_pattern): dut.download_config[i] = new_app_bin Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O") break def format_case_name(case): # we could split cases of same config into multiple binaries as we have limited rom space # we should regard those configs like `default` and `default_2` as the same config match = STRIP_CONFIG_PATTERN.match(case['config']) stripped_config_name = match.group(1) return format_case_id(case['name'], target=case['target'], config=stripped_config_name) def reset_dut(dut): dut.reset() # esptool ``run`` cmd takes quite long time. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened. # this could cause checking bootup print failed. # now use input cmd `-` and check test history to check if DUT is bootup. # we'll retry this step for a few times, # in case `dut.reset` returns during DUT bootup (when DUT can't process any command). # # during bootup, DUT might only receive part of the first `-` command. # If it only receive `\n`, then it will print all cases. It could take more than 5 seconds, reset check will fail. # To solve this problem, we will add a delay between reset and input `-` command. And we'll also enlarge expect timeout. time.sleep(DUT_DELAY_AFTER_RESET) for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT): dut.write("-") try: dut.expect("0 Tests 0 Failures 0 Ignored", timeout=TEST_HISTORY_CHECK_TIMEOUT) break except DUT.ExpectTimeout: pass else: raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port)) def log_test_case(description, test_case, ut_config): Utility.console_log("Running {} '{}' (config {})".format(description, test_case['name'], ut_config), color='orange') Utility.console_log('Tags: %s' % ', '.join('%s=%s' % (k, v) for (k, v) in test_case.items() if k != 'name' and v is not None), color='orange') def run_one_normal_case(dut, one_case, junit_test_case): reset_dut(dut) dut.start_capture_raw_data() # run test case dut.write("\"{}\"".format(one_case["name"])) dut.expect("Running " + one_case["name"] + "...") exception_reset_list = [] # we want to set this flag in callbacks (inner functions) # use list here so we can use append to set this flag test_finish = list() # expect callbacks def one_case_finish(result): """ one test finished, let expect loop break and log result """ test_finish.append(True) output = dut.stop_capture_raw_data() if result: Utility.console_log("Success: " + format_case_name(one_case), color="green") else: Utility.console_log("Failed: " + format_case_name(one_case), color="red") junit_test_case.add_failure_info(output) raise TestCaseFailed() def handle_exception_reset(data): """ just append data to exception list. exception list will be checked in ``handle_reset_finish``, once reset finished. """ exception_reset_list.append(data[0]) def handle_test_finish(data): """ test finished without reset """ # in this scenario reset should not happen assert not exception_reset_list if int(data[1]): # case ignored Utility.console_log("Ignored: " + format_case_name(one_case), color="orange") junit_test_case.add_skipped_info("ignored") one_case_finish(not int(data[0])) def handle_reset_finish(data): """ reset happened and reboot finished """ assert exception_reset_list # reboot but no exception/reset logged. should never happen result = False if len(one_case["reset"]) == len(exception_reset_list): for i, exception in enumerate(exception_reset_list): if not reset_reason_matches(exception, one_case["reset"][i]): break else: result = True if not result: err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"], exception_reset_list) Utility.console_log(err_msg, color="orange") junit_test_case.add_failure_info(err_msg) one_case_finish(result) while not test_finish: try: timeout_value = one_case["timeout"] dut.expect_any((RESET_PATTERN, handle_exception_reset), (EXCEPTION_PATTERN, handle_exception_reset), (ABORT_PATTERN, handle_exception_reset), (FINISH_PATTERN, handle_test_finish), (UT_APP_BOOT_UP_DONE, handle_reset_finish), timeout=timeout_value) except DUT.ExpectTimeout: Utility.console_log("Timeout in expect (%s seconds)" % timeout_value, color="orange") junit_test_case.add_failure_info("timeout") one_case_finish(False) break @ttfw_idf.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True) def run_unit_test_cases(env, extra_data): """ extra_data can be three types of value 1. as string: 1. "case_name" 2. "case_name [reset=RESET_REASON]" 2. as dict: 1. with key like {"name": "Intr_alloc test, shared ints"} 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"} 3. as list of string or dict: [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...] :param env: test env instance :param extra_data: the case name or case list or case dictionary :return: None """ case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET) # we don't want stop on failed case (unless some special scenarios we can't handle) # this flag is used to log if any of the case failed during executing # Before exit test function this flag is used to log if the case fails failed_cases = [] for ut_config in case_config: Utility.console_log("Running unit test for config: " + ut_config, "O") dut = env.get_dut("unit-test-app", app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True) if len(case_config[ut_config]) > 0: replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin')) dut.start_app() Utility.console_log("Download finished, start running test cases", "O") for one_case in case_config[ut_config]: log_test_case("test case", one_case, ut_config) performance_items = [] # create junit report test case junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case)) try: run_one_normal_case(dut, one_case, junit_test_case) performance_items = dut.get_performance_items() except TestCaseFailed: failed_cases.append(format_case_name(one_case)) except Exception as e: handle_unexpected_exception(junit_test_case, e) failed_cases.append(format_case_name(one_case)) finally: TinyFW.JunitReport.update_performance(performance_items) TinyFW.JunitReport.test_case_finish(junit_test_case) # close DUT when finish running all cases for one config env.close_dut(dut.name) class Handler(threading.Thread): WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!') SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!') FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored") def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout): self.dut = dut self.sent_signal_list = sent_signal_list self.lock = lock self.parent_case_name = parent_case_name self.child_case_name = "" self.child_case_index = child_case_index + 1 self.finish = False self.result = False self.output = "" self.fail_name = None self.timeout = timeout self.force_stop = threading.Event() # it show the running status reset_dut(self.dut) # reset the board to make it start from begining threading.Thread.__init__(self, name="{} Handler".format(dut)) def run(self): self.dut.start_capture_raw_data() def get_child_case_name(data): self.child_case_name = data[0] time.sleep(1) self.dut.write(str(self.child_case_index)) def one_device_case_finish(result): """ one test finished, let expect loop break and log result """ self.finish = True self.result = result self.output = "[{}]\n\n{}\n".format(self.child_case_name, self.dut.stop_capture_raw_data()) if not result: self.fail_name = self.child_case_name def device_wait_action(data): start_time = time.time() expected_signal = data[0].encode('utf-8') while 1: if time.time() > start_time + self.timeout: Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange") break with self.lock: for sent_signal in self.sent_signal_list: if expected_signal == sent_signal["name"]: self.dut.write(sent_signal["parameter"]) self.sent_signal_list.remove(sent_signal) break else: time.sleep(0.01) continue break def device_send_action(data): with self.lock: self.sent_signal_list.append({ "name": data[0].encode('utf-8'), "parameter": "" if data[2] is None else data[2].encode('utf-8') # no parameter means we only write EOL to DUT }) def handle_device_test_finish(data): """ test finished without reset """ # in this scenario reset should not happen if int(data[1]): # case ignored Utility.console_log("Ignored: " + self.child_case_name, color="orange") one_device_case_finish(not int(data[0])) try: time.sleep(1) self.dut.write("\"{}\"".format(self.parent_case_name)) self.dut.expect("Running " + self.parent_case_name + "...") except DUT.ExpectTimeout: Utility.console_log("No case detected!", color="orange") while not self.finish and not self.force_stop.isSet(): try: self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), # noqa: W605 - regex get_child_case_name), (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern timeout=self.timeout) except DUT.ExpectTimeout: Utility.console_log("Timeout in expect (%s seconds)" % self.timeout, color="orange") one_device_case_finish(False) break def stop(self): self.force_stop.set() def get_case_info(one_case): parent_case = one_case["name"] child_case_num = one_case["child case num"] return parent_case, child_case_num def get_dut(duts, env, name, ut_config, app_bin=None): if name in duts: dut = duts[name] else: dut = env.get_dut(name, app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True) duts[name] = dut replace_app_bin(dut, "unit-test-app", app_bin) dut.start_app() # download bin to board return dut def run_one_multiple_devices_case(duts, ut_config, env, one_case, app_bin, junit_test_case): lock = threading.RLock() threads = [] send_signal_list = [] result = True parent_case, case_num = get_case_info(one_case) for i in range(case_num): dut = get_dut(duts, env, "dut%d" % i, ut_config, app_bin) threads.append(Handler(dut, send_signal_list, lock, parent_case, i, one_case["timeout"])) for thread in threads: thread.setDaemon(True) thread.start() output = "Multiple Device Failed\n" for thread in threads: thread.join() result = result and thread.result output += thread.output if not thread.result: [thd.stop() for thd in threads] if not result: junit_test_case.add_failure_info(output) # collect performances from DUTs performance_items = [] for dut_name in duts: performance_items.extend(duts[dut_name].get_performance_items()) TinyFW.JunitReport.update_performance(performance_items) return result @ttfw_idf.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True) def run_multiple_devices_cases(env, extra_data): """ extra_data can be two types of value 1. as dict: e.g. {"name": "gpio master/slave test example", "child case num": 2, "config": "release", "env_tag": "UT_T2_1"} 2. as list dict: e.g. [{"name": "gpio master/slave test example1", "child case num": 2, "config": "release", "env_tag": "UT_T2_1"}, {"name": "gpio master/slave test example2", "child case num": 2, "config": "release", "env_tag": "UT_T2_1"}] """ failed_cases = [] case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET) duts = {} for ut_config in case_config: Utility.console_log("Running unit test for config: " + ut_config, "O") for one_case in case_config[ut_config]: log_test_case("multi-device test", one_case, ut_config, ) result = False junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case)) try: result = run_one_multiple_devices_case(duts, ut_config, env, one_case, one_case.get('app_bin'), junit_test_case) except TestCaseFailed: pass # result is False, this is handled by the finally block except Exception as e: handle_unexpected_exception(junit_test_case, e) finally: if result: Utility.console_log("Success: " + format_case_name(one_case), color="green") else: failed_cases.append(format_case_name(one_case)) Utility.console_log("Failed: " + format_case_name(one_case), color="red") TinyFW.JunitReport.test_case_finish(junit_test_case) # close all DUTs when finish running all cases for one config for dut in duts: env.close_dut(dut) duts = {} def run_one_multiple_stage_case(dut, one_case, junit_test_case): reset_dut(dut) dut.start_capture_raw_data() exception_reset_list = [] for test_stage in range(one_case["child case num"]): # select multi stage test case name dut.write("\"{}\"".format(one_case["name"])) dut.expect("Running " + one_case["name"] + "...") # select test function for current stage dut.write(str(test_stage + 1)) # we want to set this flag in callbacks (inner functions) # use list here so we can use append to set this flag stage_finish = list() def last_stage(): return test_stage == one_case["child case num"] - 1 def check_reset(): if one_case["reset"]: assert exception_reset_list # reboot but no exception/reset logged. should never happen result = False if len(one_case["reset"]) == len(exception_reset_list): for i, exception in enumerate(exception_reset_list): if not reset_reason_matches(exception, one_case["reset"][i]): break else: result = True if not result: err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"], exception_reset_list) Utility.console_log(err_msg, color="orange") junit_test_case.add_failure_info(err_msg) else: # we allow omit reset in multi stage cases result = True return result # expect callbacks def one_case_finish(result): """ one test finished, let expect loop break and log result """ # handle test finish result = result and check_reset() output = dut.stop_capture_raw_data() if result: Utility.console_log("Success: " + format_case_name(one_case), color="green") else: Utility.console_log("Failed: " + format_case_name(one_case), color="red") junit_test_case.add_failure_info(output) raise TestCaseFailed() stage_finish.append("break") def handle_exception_reset(data): """ just append data to exception list. exception list will be checked in ``handle_reset_finish``, once reset finished. """ exception_reset_list.append(data[0]) def handle_test_finish(data): """ test finished without reset """ # in this scenario reset should not happen if int(data[1]): # case ignored Utility.console_log("Ignored: " + format_case_name(one_case), color="orange") junit_test_case.add_skipped_info("ignored") # only passed in last stage will be regarded as real pass if last_stage(): one_case_finish(not int(data[0])) else: Utility.console_log("test finished before enter last stage", color="orange") one_case_finish(False) def handle_next_stage(data): """ reboot finished. we goto next stage """ if last_stage(): # already last stage, should never goto next stage Utility.console_log("didn't finish at last stage", color="orange") one_case_finish(False) else: stage_finish.append("continue") while not stage_finish: try: timeout_value = one_case["timeout"] dut.expect_any((RESET_PATTERN, handle_exception_reset), (EXCEPTION_PATTERN, handle_exception_reset), (ABORT_PATTERN, handle_exception_reset), (FINISH_PATTERN, handle_test_finish), (UT_APP_BOOT_UP_DONE, handle_next_stage), timeout=timeout_value) except DUT.ExpectTimeout: Utility.console_log("Timeout in expect (%s seconds)" % timeout_value, color="orange") one_case_finish(False) break if stage_finish[0] == "break": # test breaks on current stage break @ttfw_idf.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True) def run_multiple_stage_cases(env, extra_data): """ extra_data can be 2 types of value 1. as dict: Mandatory keys: "name" and "child case num", optional keys: "reset" and others 3. as list of string or dict: [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...] :param env: test env instance :param extra_data: the case name or case list or case dictionary :return: None """ case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET) # we don't want stop on failed case (unless some special scenarios we can't handle) # this flag is used to log if any of the case failed during executing # Before exit test function this flag is used to log if the case fails failed_cases = [] for ut_config in case_config: Utility.console_log("Running unit test for config: " + ut_config, "O") dut = env.get_dut("unit-test-app", app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True) if len(case_config[ut_config]) > 0: replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin')) dut.start_app() for one_case in case_config[ut_config]: log_test_case("multi-stage test", one_case, ut_config) performance_items = [] junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case)) try: run_one_multiple_stage_case(dut, one_case, junit_test_case) performance_items = dut.get_performance_items() except TestCaseFailed: failed_cases.append(format_case_name(one_case)) except Exception as e: handle_unexpected_exception(junit_test_case, e) failed_cases.append(format_case_name(one_case)) finally: TinyFW.JunitReport.update_performance(performance_items) TinyFW.JunitReport.test_case_finish(junit_test_case) # close DUT when finish running all cases for one config env.close_dut(dut.name) def detect_update_unit_test_info(env, extra_data, app_bin): case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET) for ut_config in case_config: dut = env.get_dut("unit-test-app", app_path=UT_APP_PATH, app_config_name=ut_config) replace_app_bin(dut, "unit-test-app", app_bin) dut.start_app() reset_dut(dut) # get the list of test cases dut.write("") dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT) def find_update_dic(name, _t, _timeout, child_case_num=None): for _case_data in extra_data: if _case_data['name'] == name: _case_data['type'] = _t if 'timeout' not in _case_data: _case_data['timeout'] = _timeout if child_case_num: _case_data['child case num'] = child_case_num try: while True: data = dut.expect(TEST_PATTERN, timeout=DEFAULT_TIMEOUT) test_case_name = data[1] m = re.search(r'\[timeout=(\d+)\]', data[2]) if m: timeout = int(m.group(1)) else: timeout = 30 m = re.search(r'\[multi_stage\]', data[2]) if m: test_case_type = MULTI_STAGE_ID else: m = re.search(r'\[multi_device\]', data[2]) if m: test_case_type = MULTI_DEVICE_ID else: test_case_type = SIMPLE_TEST_ID find_update_dic(test_case_name, test_case_type, timeout) if data[3] and re.search(END_LIST_STR, data[3]): break continue # find the last submenu item data = dut.expect(TEST_SUBMENU_PATTERN, timeout=DEFAULT_TIMEOUT) find_update_dic(test_case_name, test_case_type, timeout, child_case_num=int(data[0])) if data[1] and re.search(END_LIST_STR, data[1]): break # check if the unit test case names are correct, i.e. they could be found in the device for _dic in extra_data: if 'type' not in _dic: raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name'))) except DUT.ExpectTimeout: Utility.console_log("Timeout during getting the test list", color="red") finally: dut.close() # These options are the same for all configs, therefore there is no need to continue break if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( '--repeat', '-r', help='Number of repetitions for the test(s). Default is 1.', type=int, default=1 ) parser.add_argument('--env_config_file', '-e', help='test env config file', default=None) parser.add_argument('--app_bin', '-b', help='application binary file for flashing the chip', default=None) parser.add_argument('test', help='Comma separated list of