From 5b8a9478a32804a6d2a059a8a588179f17a1c2ab Mon Sep 17 00:00:00 2001 From: houchenyao Date: Wed, 31 Jan 2018 18:59:10 +0800 Subject: [PATCH] CI: new CI ut framework, and can run it in local PC --- .gitlab-ci.yml | 30 +- tools/tiny-test-fw/CIAssignExampleTest.py | 137 +------ tools/tiny-test-fw/CIAssignUnitTest.py | 121 ++++++ tools/tiny-test-fw/IDF/IDFApp.py | 27 +- tools/tiny-test-fw/Utility/CIAssignTest.py | 206 +++++++++++ tools/tiny-test-fw/Utility/CaseConfig.py | 18 +- tools/tiny-test-fw/Utility/GitlabCIJob.py | 2 +- tools/unit-test-app/tools/UnitTestParser.py | 15 +- tools/unit-test-app/unit_test.py | 384 ++++++++++++++++++++ 9 files changed, 778 insertions(+), 162 deletions(-) create mode 100644 tools/tiny-test-fw/CIAssignUnitTest.py create mode 100644 tools/tiny-test-fw/Utility/CIAssignTest.py create mode 100644 tools/unit-test-app/unit_test.py diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d813ef9812..2153a3d4ed 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -446,22 +446,21 @@ assign_test: - components/idf_test/*/CIConfigs - components/idf_test/*/TC.sqlite - $EXAMPLE_CONFIG_OUTPUT_PATH + - tools/unit-test-app/output expire_in: 1 mos before_script: *add_gitlab_key_before script: # first move test bins together: test_bins/CHIP_SDK/TestApp/bin_files - mkdir -p $OUTPUT_BIN_PATH - # copy and rename folder name to "UT_config" - - for CONFIG in $(ls $UT_BIN_PATH); do cp -r "$UT_BIN_PATH/$CONFIG" "$OUTPUT_BIN_PATH/UT_$CONFIG"; done - cp -r SSC/ssc_bin/* $OUTPUT_BIN_PATH # assign example tests - python $TEST_FW_PATH/CIAssignExampleTest.py $IDF_PATH/examples $IDF_PATH/.gitlab-ci.yml $EXAMPLE_CONFIG_OUTPUT_PATH + # assign unit test cases + - python $TEST_FW_PATH/CIAssignUnitTest.py $IDF_PATH/components/idf_test/unit_test/TestCaseAll.yml $IDF_PATH/.gitlab-ci.yml $IDF_PATH/components/idf_test/unit_test/CIConfigs # clone test script to assign tests - git clone $TEST_SCRIPT_REPOSITORY - cd auto_test_script - python $CHECKOUT_REF_SCRIPT auto_test_script - # assign unit test cases - - python CIAssignTestCases.py -t $IDF_PATH/components/idf_test/unit_test -c $IDF_PATH/.gitlab-ci.yml -b $IDF_PATH/test_bins # assgin integration test cases - python CIAssignTestCases.py -t $IDF_PATH/components/idf_test/integration_test -c $IDF_PATH/.gitlab-ci.yml -b $IDF_PATH/test_bins @@ -493,6 +492,17 @@ assign_test: # run test - python Runner.py $TEST_CASE_PATH -c $CONFIG_FILE +.unit_test_template: &unit_test_template + <<: *example_test_template + stage: unit_test + dependencies: + - assign_test + variables: + TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw" + TEST_CASE_PATH: "$CI_PROJECT_DIR/tools/unit-test-app" + CONFIG_FILE: "$CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/$CI_JOB_NAME.yml" + LOG_PATH: "$CI_PROJECT_DIR/TEST_LOGS" + .test_template: &test_template stage: test when: on_success @@ -530,18 +540,6 @@ assign_test: # run test - python CIRunner.py -l "$LOG_PATH/$CI_JOB_NAME" -c $CONFIG_FILE -e $LOCAL_ENV_CONFIG_PATH -t $TEST_CASE_FILE_PATH -m $MODULE_UPDATE_FILE -# template for unit test jobs -.unit_test_template: &unit_test_template - <<: *test_template - allow_failure: false - stage: unit_test - variables: - LOCAL_ENV_CONFIG_PATH: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/ESP32_IDF" - LOG_PATH: "$CI_PROJECT_DIR/$CI_COMMIT_SHA" - TEST_CASE_FILE_PATH: "$CI_PROJECT_DIR/components/idf_test/unit_test" - MODULE_UPDATE_FILE: "$CI_PROJECT_DIR/components/idf_test/ModuleDefinition.yml" - CONFIG_FILE: "$CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/$CI_JOB_NAME.yml" - nvs_compatible_test: <<: *test_template artifacts: diff --git a/tools/tiny-test-fw/CIAssignExampleTest.py b/tools/tiny-test-fw/CIAssignExampleTest.py index 1cd3613131..f13a803e44 100644 --- a/tools/tiny-test-fw/CIAssignExampleTest.py +++ b/tools/tiny-test-fw/CIAssignExampleTest.py @@ -22,147 +22,16 @@ import sys import re import argparse -import yaml - test_fw_path = os.getenv("TEST_FW_PATH") if test_fw_path: sys.path.insert(0, test_fw_path) -from Utility import CaseConfig, SearchCases, GitlabCIJob +from Utility.CIAssignTest import AssignTest -class Group(object): - - MAX_EXECUTION_TIME = 30 - MAX_CASE = 15 - SORT_KEYS = ["env_tag"] - - def __init__(self, case): - self.execution_time = 0 - self.case_list = [case] - self.filters = dict(zip(self.SORT_KEYS, [case.case_info[x] for x in self.SORT_KEYS])) - - def accept_new_case(self): - """ - check if allowed to add any case to this group - - :return: True or False - """ - max_time = (sum([x.case_info["execution_time"] for x in self.case_list]) < self.MAX_EXECUTION_TIME) - max_case = (len(self.case_list) < self.MAX_CASE) - return max_time and max_case - - def add_case(self, case): - """ - add case to current group - - :param case: test case - :return: True if add succeed, else False - """ - added = False - if self.accept_new_case(): - for key in self.filters: - if case.case_info[key] != self.filters[key]: - break - else: - self.case_list.append(case) - added = True - return added - - def output(self): - """ - output data for job configs - - :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group} - """ - output_data = { - "Filter": self.filters, - "CaseConfig": [{"name": x.case_info["name"]} for x in self.case_list], - } - return output_data - - -class AssignTest(object): - """ - Auto assign tests to CI jobs. - - :param test_case: path of test case file(s) - :param ci_config_file: path of ``.gitlab-ci.yml`` - """ - +class CIExampleAssignTest(AssignTest): CI_TEST_JOB_PATTERN = re.compile(r"^example_test_.+") - def __init__(self, test_case, ci_config_file): - self.test_cases = self._search_cases(test_case) - self.jobs = self._parse_gitlab_ci_config(ci_config_file) - - def _parse_gitlab_ci_config(self, ci_config_file): - - with open(ci_config_file, "r") as f: - ci_config = yaml.load(f) - - job_list = list() - for job_name in ci_config: - if self.CI_TEST_JOB_PATTERN.search(job_name) is not None: - job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name)) - return job_list - - @staticmethod - def _search_cases(test_case, case_filter=None): - """ - :param test_case: path contains test case folder - :param case_filter: filter for test cases - :return: filtered test case list - """ - test_methods = SearchCases.Search.search_test_cases(test_case) - return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict()) - - def _group_cases(self): - """ - separate all cases into groups according group rules. each group will be executed by one CI job. - - :return: test case groups. - """ - groups = [] - for case in self.test_cases: - for group in groups: - # add to current group - if group.add_case(case): - break - else: - # create new group - groups.append(Group(case)) - return groups - - def assign_cases(self): - """ - separate test cases to groups and assign test cases to CI jobs. - - :raise AssertError: if failed to assign any case to CI job. - :return: None - """ - failed_to_assign = [] - test_groups = self._group_cases() - for group in test_groups: - for job in self.jobs: - if job.match_group(group): - job.assign_group(group) - break - else: - failed_to_assign.append(group) - assert not failed_to_assign - - def output_configs(self, output_path): - """ - - :param output_path: path to output config files for each CI job - :return: None - """ - if not os.path.exists(output_path): - os.makedirs(output_path) - for job in self.jobs: - job.output_config(output_path) - if __name__ == '__main__': parser = argparse.ArgumentParser() @@ -174,6 +43,6 @@ if __name__ == '__main__': help="output path of config files") args = parser.parse_args() - assign_test = AssignTest(args.test_case, args.ci_config_file) + assign_test = CIExampleAssignTest(args.test_case, args.ci_config_file) assign_test.assign_cases() assign_test.output_configs(args.output_path) diff --git a/tools/tiny-test-fw/CIAssignUnitTest.py b/tools/tiny-test-fw/CIAssignUnitTest.py new file mode 100644 index 0000000000..a621eb8e0e --- /dev/null +++ b/tools/tiny-test-fw/CIAssignUnitTest.py @@ -0,0 +1,121 @@ +""" +Command line tool to assign unit tests to CI test jobs. +""" + +import re +import os +import sys +import argparse + +import yaml + +test_fw_path = os.getenv("TEST_FW_PATH") +if test_fw_path: + sys.path.insert(0, test_fw_path) + +from Utility import CIAssignTest + + +class Group(CIAssignTest.Group): + SORT_KEYS = ["Test App", "SDK", "test environment"] + MAX_CASE = 30 + ATTR_CONVERT_TABLE = { + "execution_time": "execution time" + } + + @staticmethod + def _get_case_attr(case, attr): + if attr in Group.ATTR_CONVERT_TABLE: + attr = Group.ATTR_CONVERT_TABLE[attr] + return case[attr] + + @staticmethod + def _get_ut_config(test_app): + # we format test app "UT_ + config" when parsing test cases + # now we need to extract config + assert test_app[:3] == "UT_" + return test_app[3:] + + def _create_extra_data(self): + case_data = [] + for case in self.case_list: + if self._get_case_attr(case, "cmd set") == "multiple_devices_case": + case_data.append({ + "config": self._get_ut_config(self._get_case_attr(case, "Test App")), + "name": self._get_case_attr(case, "summary"), + "child case num": self._get_case_attr(case, "child case num") + }) + else: + case_data.append({ + "config": self._get_ut_config(self._get_case_attr(case, "Test App")), + "name": self._get_case_attr(case, "summary"), + "reset": self._get_case_attr(case, "reset") , + }) + return case_data + + def output(self): + """ + output data for job configs + + :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group} + """ + output_data = { + # we don't need filter for test function, as UT uses a few test functions for all cases + "CaseConfig": [ + { + "name": self.case_list[0]["cmd set"] if isinstance(self.case_list[0]["cmd set"], str) else self.case_list[0]["cmd set"][0], + "extra_data": self._create_extra_data(), + } + ] + } + return output_data + + +class UnitTestAssignTest(CIAssignTest.AssignTest): + CI_TEST_JOB_PATTERN = re.compile(r"^UT_.+") + + def __init__(self, test_case_path, ci_config_file): + CIAssignTest.AssignTest.__init__(self, test_case_path, ci_config_file, case_group=Group) + + @staticmethod + def _search_cases(test_case_path, case_filter=None): + """ + For unit test case, we don't search for test functions. + The unit test cases is stored in a yaml file which is created in job build-idf-test. + """ + + with open(test_case_path, "r") as f: + raw_data = yaml.load(f) + test_cases = raw_data["test cases"] + if case_filter: + for key in case_filter: + filtered_cases = [] + for case in test_cases: + try: + # bot converts string to lower case + if isinstance(case[key], str): + _value = case[key].lower() + else: + _value = case[key] + if _value in case_filter[key]: + filtered_cases.append(case) + except KeyError: + # case don't have this key, regard as filter success + filtered_cases.append(case) + test_cases = filtered_cases + return test_cases + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("test_case", + help="test case folder or file") + parser.add_argument("ci_config_file", + help="gitlab ci config file") + parser.add_argument("output_path", + help="output path of config files") + args = parser.parse_args() + + assign_test = UnitTestAssignTest(args.test_case, args.ci_config_file) + assign_test.assign_cases() + assign_test.output_configs(args.output_path) diff --git a/tools/tiny-test-fw/IDF/IDFApp.py b/tools/tiny-test-fw/IDF/IDFApp.py index 3828277ed8..4bf667f64b 100644 --- a/tools/tiny-test-fw/IDF/IDFApp.py +++ b/tools/tiny-test-fw/IDF/IDFApp.py @@ -144,11 +144,28 @@ class Example(IDFApp): class UT(IDFApp): def get_binary_path(self, app_path): - if app_path: - # specified path, join it and the idf path - path = os.path.join(self.idf_path, app_path) - else: - path = os.path.join(self.idf_path, "tools", "unit-test-app", "build") + """ + :param app_path: app path or app config + :return: binary path + """ + if not app_path: + app_path = "default" + + path = os.path.join(self.idf_path, app_path) + if not os.path.exists(path): + while True: + # try to get by config + if app_path == "default": + # it's default config, we first try to get form build folder of unit-test-app + path = os.path.join(self.idf_path, "tools", "unit-test-app", "build") + if os.path.exists(path): + # found, use bin in build path + break + # ``make ut-build-all-configs`` or ``make ut-build-CONFIG`` will copy binary to output folder + path = os.path.join(self.idf_path, "tools", "unit-test-app", "output", app_path) + if os.path.exists(path): + break + raise OSError("Failed to get unit-test-app binary path") return path diff --git a/tools/tiny-test-fw/Utility/CIAssignTest.py b/tools/tiny-test-fw/Utility/CIAssignTest.py new file mode 100644 index 0000000000..ff1bf994b1 --- /dev/null +++ b/tools/tiny-test-fw/Utility/CIAssignTest.py @@ -0,0 +1,206 @@ +# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Common logic to assign test cases to CI jobs. + +Some background knowledge about Gitlab CI and use flow in esp-idf: + +* Gitlab CI jobs are static in ``.gitlab-ci.yml``. We can't dynamically create test jobs +* For test job running on DUT, we use ``tags`` to select runners with different test environment +* We have ``assign_test`` stage, will collect cases, and then assign them to correct test jobs +* ``assign_test`` will fail if failed to assign any cases +* with ``assign_test``, we can: + * dynamically filter test case we want to test + * alert user if they forget to add CI jobs and guide how to add test jobs +* the last step of ``assign_test`` is to output config files, then test jobs will run these cases + +The Basic logic to assign test cases is as follow: + +1. do search all the cases +2. do filter case (if filter is specified by @bot) +3. put cases to different groups according to rule of ``Group`` + * try to put them in existed groups + * if failed then create a new group and add this case +4. parse and filter the test jobs from CI config file +5. try to assign all groups to jobs according to tags +6. output config files for jobs + +""" + +import os +import re +import json + +import yaml + +from Utility import (CaseConfig, SearchCases, GitlabCIJob) + + +class Group(object): + + MAX_EXECUTION_TIME = 30 + MAX_CASE = 15 + SORT_KEYS = ["env_tag"] + + def __init__(self, case): + self.execution_time = 0 + self.case_list = [case] + self.filters = dict(zip(self.SORT_KEYS, [self._get_case_attr(case, x) for x in self.SORT_KEYS])) + + @staticmethod + def _get_case_attr(case, attr): + # we might use different type for case (dict or test_func) + # this method will do get attribute form cases + return case.case_info[attr] + + def accept_new_case(self): + """ + check if allowed to add any case to this group + + :return: True or False + """ + max_time = (sum([self._get_case_attr(x, "execution_time") for x in self.case_list]) + < self.MAX_EXECUTION_TIME) + max_case = (len(self.case_list) < self.MAX_CASE) + return max_time and max_case + + def add_case(self, case): + """ + add case to current group + + :param case: test case + :return: True if add succeed, else False + """ + added = False + if self.accept_new_case(): + for key in self.filters: + if self._get_case_attr(case, key) != self.filters[key]: + break + else: + self.case_list.append(case) + added = True + return added + + def output(self): + """ + output data for job configs + + :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group} + """ + output_data = { + "Filter": self.filters, + "CaseConfig": [{"name": self._get_case_attr(x, "name")} for x in self.case_list], + } + return output_data + + +class AssignTest(object): + """ + Auto assign tests to CI jobs. + + :param test_case_path: path of test case file(s) + :param ci_config_file: path of ``.gitlab-ci.yml`` + """ + # subclass need to rewrite CI test job pattern, to filter all test jobs + CI_TEST_JOB_PATTERN = re.compile(r"^test_.+") + + def __init__(self, test_case_path, ci_config_file, case_group=Group): + self.test_case_path = test_case_path + self.test_cases = [] + self.jobs = self._parse_gitlab_ci_config(ci_config_file) + self.case_group = case_group + + def _parse_gitlab_ci_config(self, ci_config_file): + + with open(ci_config_file, "r") as f: + ci_config = yaml.load(f) + + job_list = list() + for job_name in ci_config: + if self.CI_TEST_JOB_PATTERN.search(job_name) is not None: + job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name)) + return job_list + + @staticmethod + def _search_cases(test_case_path, case_filter=None): + """ + :param test_case_path: path contains test case folder + :param case_filter: filter for test cases + :return: filtered test case list + """ + test_methods = SearchCases.Search.search_test_cases(test_case_path) + return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict()) + + def _group_cases(self): + """ + separate all cases into groups according group rules. each group will be executed by one CI job. + + :return: test case groups. + """ + groups = [] + for case in self.test_cases: + for group in groups: + # add to current group + if group.add_case(case): + break + else: + # create new group + groups.append(self.case_group(case)) + return groups + + @staticmethod + def _apply_bot_filter(): + """ + we support customize CI test with bot. + here we process from and return the filter which ``_search_cases`` accepts. + + :return: filter for search test cases + """ + bot_filter = os.getenv("BOT_CASE_FILTER") + if bot_filter: + bot_filter = json.loads(bot_filter) + else: + bot_filter = dict() + return bot_filter + + def assign_cases(self): + """ + separate test cases to groups and assign test cases to CI jobs. + + :raise AssertError: if failed to assign any case to CI job. + :return: None + """ + failed_to_assign = [] + case_filter = self._apply_bot_filter() + self.test_cases = self._search_cases(self.test_case_path, case_filter) + test_groups = self._group_cases() + for group in test_groups: + for job in self.jobs: + if job.match_group(group): + job.assign_group(group) + break + else: + failed_to_assign.append(group) + assert not failed_to_assign + + def output_configs(self, output_path): + """ + :param output_path: path to output config files for each CI job + :return: None + """ + if not os.path.exists(output_path): + os.makedirs(output_path) + for job in self.jobs: + job.output_config(output_path) diff --git a/tools/tiny-test-fw/Utility/CaseConfig.py b/tools/tiny-test-fw/Utility/CaseConfig.py index 1fe5df42ba..af013ec282 100644 --- a/tools/tiny-test-fw/Utility/CaseConfig.py +++ b/tools/tiny-test-fw/Utility/CaseConfig.py @@ -51,6 +51,20 @@ import yaml import TestCase +def _convert_to_lower_case(item): + """ + bot filter is always lower case string. + this function will convert to all string to lower case. + """ + if isinstance(item, (tuple, list)): + output = [_convert_to_lower_case(v) for v in item] + elif isinstance(item, str): + output = item.lower() + else: + output = item + return output + + def _filter_one_case(test_method, case_filter): """ Apply filter for one case (the filter logic is the same as described in ``filter_test_cases``) """ filter_result = True @@ -58,7 +72,8 @@ def _filter_one_case(test_method, case_filter): if key in test_method.case_info: # the filter key is both in case and filter # we need to check if they match - filter_item, accepted_item = case_filter[key], test_method.case_info[key] + filter_item = _convert_to_lower_case(case_filter[key]) + accepted_item = _convert_to_lower_case(test_method.case_info[key]) if isinstance(filter_item, (tuple, list)) \ and isinstance(accepted_item, (tuple, list)): @@ -91,6 +106,7 @@ def filter_test_cases(test_methods, case_filter): * if one is list/tuple, the other one is string/int, then check if string/int is in list/tuple * if both are list/tuple, then check if they have common item 2. if only case attribute or filter have the key, filter succeed + 3. will do case insensitive compare for string for example, the following are match succeed scenarios (the rule is symmetric, result is same if exchange values for user filter and case attribute): diff --git a/tools/tiny-test-fw/Utility/GitlabCIJob.py b/tools/tiny-test-fw/Utility/GitlabCIJob.py index 05f6393c66..9d1223c94c 100644 --- a/tools/tiny-test-fw/Utility/GitlabCIJob.py +++ b/tools/tiny-test-fw/Utility/GitlabCIJob.py @@ -70,4 +70,4 @@ class Job(dict): file_name = os.path.join(file_path, self["name"] + ".yml") if "case group" in self: with open(file_name, "w") as f: - yaml.dump(self["case group"].output(), f) + yaml.dump(self["case group"].output(), f, default_flow_style=False) diff --git a/tools/unit-test-app/tools/UnitTestParser.py b/tools/unit-test-app/tools/UnitTestParser.py index c296077d31..f3ca496dc4 100644 --- a/tools/unit-test-app/tools/UnitTestParser.py +++ b/tools/unit-test-app/tools/UnitTestParser.py @@ -20,7 +20,8 @@ TEST_CASE_PATTERN = { "version": "v1 (2016-12-06)", "test environment": "UT_T1_1", "reset": "", - "expected result": "1. set succeed" + "expected result": "1. set succeed", + "cmd set": "test_unit_test_case", } CONFIG_FILE_PATTERN = { @@ -78,10 +79,10 @@ class Parser(object): name_addr = table.get_unsigned_int(section, test_addr, 4) desc_addr = table.get_unsigned_int(section, test_addr + 4, 4) file_name_addr = table.get_unsigned_int(section, test_addr + 12, 4) + function_count = table.get_unsigned_int(section, test_addr+20, 4) name = table.get_string("any", name_addr) desc = table.get_string("any", desc_addr) file_name = table.get_string("any", file_name_addr) - tc = self.parse_one_test_case(name, desc, file_name, app_name) # check if duplicated case names @@ -100,7 +101,13 @@ class Parser(object): self.test_env_tags[tc["test environment"]].append(tc["ID"]) else: self.test_env_tags.update({tc["test environment"]: [tc["ID"]]}) - # only add cases need to be executed + + if function_count > 1: + tc.update({"cmd set": "multiple_devices_case", + "child case num": function_count}) + del tc['reset'] + + # only add cases need to be executed test_cases.append(tc) os.remove("section_table.tmp") @@ -178,7 +185,6 @@ class Parser(object): test_case.update({"Test App": self.APP_NAME_PREFIX + app_name, "module": self.module_map[prop["module"]]['module'], "CI ready": "No" if prop["ignore"] == "Yes" else "Yes", - "cmd set": ["IDFUnitTest/UnitTest", [name]], "ID": tc_id, "test point 2": prop["module"], "steps": name, @@ -262,4 +268,3 @@ def main(): if __name__ == '__main__': main() - diff --git a/tools/unit-test-app/unit_test.py b/tools/unit-test-app/unit_test.py new file mode 100644 index 0000000000..bb6cf74e54 --- /dev/null +++ b/tools/unit-test-app/unit_test.py @@ -0,0 +1,384 @@ +""" +Test script for unit test case. +""" + +import re +import os +import sys +import time + +import threading + +# if we want to run test case outside `tiny-test-fw` folder, +# we need to insert tiny-test-fw path into sys path +test_fw_path = os.getenv("TEST_FW_PATH") +if test_fw_path and test_fw_path not in sys.path: + sys.path.insert(0, test_fw_path) + +import TinyFW +import IDF +import Utility +from DUT import ExpectTimeout +from IDF.IDFApp import UT + + +UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests." +UT_TIMEOUT = 30 + +def format_test_case_config(test_case_data): + """ + convert the test case data to unified format. + We need to following info to run unit test cases: + + 1. unit test app config + 2. test case name + 3. test case reset info + + the formatted case config is a dict, with ut app config as keys. The value is a list of test cases. + Each test case is a dict with "name" and "reset" as keys. For example:: + + case_config = { + "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}], + "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}], + } + + If config is not specified for test case, then + + :param test_case_data: string, list, or a dictionary list + :return: formatted data + """ + + case_config = dict() + + def parse_case(one_case_data): + """ parse and format one case """ + + def process_reset_list(reset_list): + # strip space and remove white space only items + _output = list() + for _r in reset_list: + _data = _r.strip(" ") + if _data: + _output.append(_data) + return _output + + _case = dict() + if isinstance(one_case_data, str): + _temp = one_case_data.split(" [reset=") + _case["name"] = _temp[0] + try: + _case["reset"] = process_reset_list(_temp[1][0:-1].split(",")) + except IndexError: + _case["reset"] = list() + elif isinstance(one_case_data, dict): + _case = one_case_data.copy() + assert "name" in _case + if "reset" not in _case: + _case["reset"] = list() + else: + if isinstance(_case["reset"], str): + _case["reset"] = process_reset_list(_case["reset"].split(",")) + else: + raise TypeError("Not supported type during parsing unit test case") + + if "config" not in _case: + _case["config"] = "default" + + return _case + + if not isinstance(test_case_data, list): + test_case_data = [test_case_data] + + for case_data in test_case_data: + parsed_case = parse_case(case_data) + try: + case_config[parsed_case["config"]].append(parsed_case) + except KeyError: + case_config[parsed_case["config"]] = [parsed_case] + + return case_config + + +@TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="unit_test", + execution_time=1, env_tag="UT_T1_1") +def test_unit_test_case(env, extra_data): + """ + extra_data can be three types of value + 1. as string: + 1. "case_name" + 2. "case_name [reset=RESET_REASON]" + 2. as dict: + 1. with key like {"name": "Intr_alloc test, shared ints"} + 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"} + 3. as list of string or dict: + [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...] + + :param extra_data: the case name or case list or case dictionary + :return: None + """ + + case_config = format_test_case_config(extra_data) + + # compile the patterns for expect only once + reset_pattern = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))") + exception_pattern = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))") + abort_pattern = re.compile(r"(abort\(\) was called at PC 0x[a-eA-E\d]{8} on core \d)") + finish_pattern = re.compile(r"1 Tests (\d) Failures (\d) Ignored") + + # we don't want stop on failed case (unless some special scenarios we can't handle) + # this flag is used to log if any of the case failed during executing + # Before exit test function this flag is used to log if the case fails + failed_cases = [] + + for ut_config in case_config: + dut = env.get_dut("unit-test-app", app_path=ut_config) + dut.start_app() + + for one_case in case_config[ut_config]: + dut.reset() + # esptool ``run`` cmd takes quite long time. + # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened. + # this could cause checking bootup print failed. + # now we input cmd `-`, and check either bootup print or test history, + # to determine if DUT is ready to test. + dut.write("-", flush=False) + dut.expect_any(UT_APP_BOOT_UP_DONE, + "0 Tests 0 Failures 0 Ignored") + + # run test case + dut.write("\"{}\"".format(one_case["name"])) + dut.expect("Running " + one_case["name"] + "...") + + exception_reset_list = [] + + # we want to set this flag in callbacks (inner functions) + # use list here so we can use append to set this flag + test_finish = list() + + # expect callbacks + def one_case_finish(result): + """ one test finished, let expect loop break and log result """ + test_finish.append(True) + if result: + Utility.console_log("Success: " + one_case["name"], color="green") + else: + failed_cases.append(one_case["name"]) + Utility.console_log("Failed: " + one_case["name"], color="red") + + def handle_exception_reset(data): + """ + just append data to exception list. + exception list will be checked in ``handle_reset_finish``, once reset finished. + """ + exception_reset_list.append(data[0]) + + def handle_test_finish(data): + """ test finished without reset """ + # in this scenario reset should not happen + assert not exception_reset_list + if int(data[1]): + # case ignored + Utility.console_log("Ignored: " + one_case["name"], color="orange") + one_case_finish(not int(data[0])) + + def handle_reset_finish(data): + """ reset happened and reboot finished """ + assert exception_reset_list # reboot but no exception/reset logged. should never happen + result = False + if len(one_case["reset"]) == len(exception_reset_list): + for i, exception in enumerate(exception_reset_list): + if one_case["reset"][i] not in exception: + break + else: + result = True + if not result: + Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}""" + .format(one_case["reset"], exception_reset_list), + color="orange") + one_case_finish(result) + + while not test_finish: + try: + dut.expect_any((reset_pattern, handle_exception_reset), # reset pattern + (exception_pattern, handle_exception_reset), # exception pattern + (abort_pattern, handle_exception_reset), # abort pattern + (finish_pattern, handle_test_finish), # test finish pattern + (UT_APP_BOOT_UP_DONE, handle_reset_finish), # reboot finish pattern + timeout=UT_TIMEOUT) + except ExpectTimeout: + Utility.console_log("Timeout in expect", color="orange") + one_case_finish(False) + break + + # raise exception if any case fails + if failed_cases: + Utility.console_log("Failed Cases:", color="red") + for _case_name in failed_cases: + Utility.console_log("\t" + _case_name, color="red") + raise AssertionError("Unit Test Failed") + + +class Handler(threading.Thread): + + WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!') + SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[(.+)\]!') + FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored") + + def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout=30): + self.dut = dut + self.sent_signal_list = sent_signal_list + self.lock = lock + self.parent_case_name = parent_case_name + self.child_case_name = "" + self.child_case_index = child_case_index + 1 + self.finish = False + self.result = False + self.fail_name = None + self.timeout = timeout + threading.Thread.__init__(self, name="{} Handler".format(dut)) + + def run(self): + def get_child_case_name(data): + self.child_case_name = data[0] + time.sleep(1) + self.dut.write(str(self.child_case_index)) + + def one_device_case_finish(result): + """ one test finished, let expect loop break and log result """ + self.finish = True + self.result = result + if not result: + self.fail_name = self.child_case_name + + def device_wait_action(data): + start_time = time.time() + expected_signal = data[0] + while 1: + if time.time() > start_time + self.timeout: + Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange") + break + with self.lock: + if expected_signal in self.sent_signal_list: + self.dut.write(" ") + self.sent_signal_list.remove(expected_signal) + break + time.sleep(0.01) + + def device_send_action(data): + with self.lock: + self.sent_signal_list.append(data[0].encode('utf-8')) + + def handle_device_test_finish(data): + """ test finished without reset """ + # in this scenario reset should not happen + if int(data[1]): + # case ignored + Utility.console_log("Ignored: " + self.child_case_name, color="orange") + one_device_case_finish(not int(data[0])) + + self.dut.reset() + self.dut.write("-", flush=False) + self.dut.expect_any(UT_APP_BOOT_UP_DONE, "0 Tests 0 Failures 0 Ignored") + time.sleep(1) + self.dut.write("\"{}\"".format(self.parent_case_name)) + self.dut.expect("Running " + self.parent_case_name + "...") + + while not self.finish: + try: + self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name), + (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern + (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern + (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern + timeout=UT_TIMEOUT) + except ExpectTimeout: + Utility.console_log("Timeout in expect", color="orange") + one_device_case_finish(False) + break + + +def get_case_info(one_case): + parent_case = one_case["name"] + child_case_num = one_case["child case num"] + return parent_case, child_case_num + + +def get_dut(duts, env, name, ut_config): + if name in duts: + dut = duts[name] + else: + dut = env.get_dut(name, app_path=ut_config) + duts[name] = dut + dut.start_app() + return dut + + +def case_run(duts, ut_config, env, one_case, failed_cases): + lock = threading.RLock() + threads = [] + send_signal_list = [] + failed_device = [] + result = True + parent_case, case_num = get_case_info(one_case) + for i in range(case_num): + dut = get_dut(duts, env, "dut%d" % i, ut_config) + threads.append(Handler(dut, send_signal_list, lock, + parent_case, i)) + for thread in threads: + thread.setDaemon(True) + thread.start() + for thread in threads: + thread.join() + result = result and thread.result + if not thread.result: + failed_device.append(thread.fail_name) + if result: + Utility.console_log("Success: " + one_case["name"], color="green") + else: + failed_cases.append(one_case["name"]) + Utility.console_log("Failed: " + one_case["name"], color="red") + + +@TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="master_slave_test_case", execution_time=1, + env_tag="UT_T2_1") +def multiple_devices_case(env, extra_data): + """ + extra_data can be two types of value + 1. as dict: + e.g. + {"name": "gpio master/slave test example", + "child case num": 2, + "config": "release", + "env_tag": "UT_T2_1"} + 2. as list dict: + e.g. + [{"name": "gpio master/slave test example1", + "child case num": 2, + "config": "release", + "env_tag": "UT_T2_1"}, + {"name": "gpio master/slave test example2", + "child case num": 2, + "config": "release", + "env_tag": "UT_T2_1"}] + + """ + failed_cases = [] + case_config = format_test_case_config(extra_data) + DUTS = {} + for ut_config in case_config: + for one_case in case_config[ut_config]: + case_run(DUTS, ut_config, env, one_case, failed_cases) + + if failed_cases: + Utility.console_log("Failed Cases:", color="red") + for _case_name in failed_cases: + Utility.console_log("\t" + _case_name, color="red") + raise AssertionError("Unit Test Failed") + +if __name__ == '__main__': + multiple_devices_case(extra_data={"name": "gpio master/slave test example", + "child case num": 2, + "config": "release", + "env_tag": "UT_T2_1"}) + + +