Merge branch 'test/enhanced_junit_test_result' into 'master'

test: enhanced junit test result

See merge request idf/esp-idf!2766
This commit is contained in:
He Yin Ling 2018-11-26 16:35:08 +08:00
commit 1c65f18422
9 changed files with 407 additions and 266 deletions

View File

@ -183,13 +183,13 @@ build_ssc_02:
# If you want to add new build ssc jobs, please add it into dependencies of `assign_test` and `.test_template`
build_esp_idf_tests:
.build_esp_idf_unit_test_template: &build_esp_idf_unit_test_template
<<: *build_template
artifacts:
paths:
- tools/unit-test-app/output
- components/idf_test/unit_test/TestCaseAll.yml
- components/idf_test/unit_test/CIConfigs/*.yml
expire_in: 2 days
only:
variables:
@ -197,11 +197,30 @@ build_esp_idf_tests:
- $BOT_LABEL_BUILD
- $BOT_LABEL_UNIT_TEST
- $BOT_LABEL_REGULAR_TEST
build_esp_idf_tests_make:
<<: *build_esp_idf_unit_test_template
script:
- export PATH="$IDF_PATH/tools:$PATH"
- cd $CI_PROJECT_DIR/tools/unit-test-app
- export EXTRA_CFLAGS="-Werror -Werror=deprecated-declarations"
- export EXTRA_CXXFLAGS=${EXTRA_CFLAGS}
- cd $CI_PROJECT_DIR/tools/unit-test-app
- MAKEFLAGS= make help # make sure kconfig tools are built in single process
- make ut-clean-all-configs
- make ut-build-all-configs
- python tools/UnitTestParser.py
- if [ "$UNIT_TEST_BUILD_SYSTEM" == "make" ]; then exit 0; fi
# If Make, delete the CMake built artifacts
- rm -rf builds output sdkconfig
- rm -rf components/idf_test/unit_test/TestCaseAll.yml
- rm -rf components/idf_test/unit_test/CIConfigs/*.yml
build_esp_idf_tests_cmake:
<<: *build_esp_idf_unit_test_template
script:
- export PATH="$IDF_PATH/tools:$PATH"
- export EXTRA_CFLAGS="-Werror -Werror=deprecated-declarations"
- export EXTRA_CXXFLAGS=${EXTRA_CFLAGS}
- cd $CI_PROJECT_DIR/tools/unit-test-app
# Build with CMake first
- idf.py ut-clean-all-configs
- idf.py ut-build-all-configs
@ -212,12 +231,6 @@ build_esp_idf_tests:
- rm -rf builds output sdkconfig
- rm -rf components/idf_test/unit_test/TestCaseAll.yml
- rm -rf components/idf_test/unit_test/CIConfigs/*.yml
# Then build with Make
- cd $CI_PROJECT_DIR/tools/unit-test-app
- MAKEFLAGS= make help # make sure kconfig tools are built in single process
- make ut-clean-all-configs
- make ut-build-all-configs
- python tools/UnitTestParser.py
.build_examples_make_template: &build_examples_make_template
<<: *build_template
@ -759,7 +772,8 @@ assign_test:
- build_ssc_00
- build_ssc_01
- build_ssc_02
- build_esp_idf_tests
- build_esp_idf_tests_make
- build_esp_idf_tests_cmake
variables:
TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
EXAMPLE_CONFIG_OUTPUT_PATH: "$CI_PROJECT_DIR/examples/test_configs"
@ -824,6 +838,8 @@ assign_test:
paths:
- $LOG_PATH
expire_in: 1 week
reports:
junit: $LOG_PATH/*/XUNIT_RESULT.xml
variables:
TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
TEST_CASE_PATH: "$CI_PROJECT_DIR/examples"
@ -846,7 +862,8 @@ assign_test:
stage: unit_test
dependencies:
- assign_test
- build_esp_idf_tests
- build_esp_idf_tests_make
- build_esp_idf_tests_cmake
only:
refs:
- master

View File

@ -205,12 +205,14 @@ class _RecvThread(threading.Thread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
def __init__(self, read, data_cache):
def __init__(self, read, data_cache, recorded_data, record_data_lock):
super(_RecvThread, self).__init__()
self.exit_event = threading.Event()
self.setDaemon(True)
self.read = read
self.data_cache = data_cache
self.recorded_data = recorded_data
self.record_data_lock = record_data_lock
# cache the last line of recv data for collecting performance
self._line_cache = str()
@ -243,7 +245,10 @@ class _RecvThread(threading.Thread):
while not self.exit_event.isSet():
data = self.read(1000)
if data:
self.data_cache.put(data)
with self.record_data_lock:
self.data_cache.put(data)
for capture_id in self.recorded_data:
self.recorded_data[capture_id].put(data)
self.collect_performance(data)
def exit(self):
@ -274,6 +279,11 @@ class BaseDUT(object):
self.log_file = log_file
self.app = app
self.data_cache = _DataCache()
# the main process of recorded data are done in receive thread
# but receive thread could be closed in DUT lifetime (tool methods)
# so we keep it in BaseDUT, as their life cycle are same
self.recorded_data = dict()
self.record_data_lock = threading.RLock()
self.receive_thread = None
self.expect_failures = []
# open and start during init
@ -389,7 +399,8 @@ class BaseDUT(object):
:return: None
"""
self._port_open()
self.receive_thread = _RecvThread(self._port_read, self.data_cache)
self.receive_thread = _RecvThread(self._port_read, self.data_cache,
self.recorded_data, self.record_data_lock)
self.receive_thread.start()
def close(self):
@ -448,6 +459,42 @@ class BaseDUT(object):
self.data_cache.flush(size)
return data
def start_capture_raw_data(self, capture_id="default"):
"""
Sometime application want to get DUT raw data and use ``expect`` method at the same time.
Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method.
If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID.
:param capture_id: ID of capture. You can use different IDs to do different captures at the same time.
"""
with self.record_data_lock:
try:
# if start capture on existed ID, we do flush data and restart capture
self.recorded_data[capture_id].flush()
except KeyError:
# otherwise, create new data cache
self.recorded_data[capture_id] = _DataCache()
def stop_capture_raw_data(self, capture_id="default"):
"""
Stop capture and get raw data.
This method should be used after ``start_capture_raw_data`` on the same capture ID.
:param capture_id: ID of capture.
:return: captured raw data between start capture and stop capture.
"""
with self.record_data_lock:
try:
ret = self.recorded_data[capture_id].get_data()
self.recorded_data.pop(capture_id)
except KeyError as e:
e.message = "capture_id does not exist. " \
"You should call start_capture_raw_data with same ID " \
"before calling stop_capture_raw_data"
raise e
return ret
# expect related methods
@staticmethod

View File

@ -77,7 +77,11 @@ def log_performance(item, value):
:param item: performance item name
:param value: performance value
"""
Utility.console_log("[Performance][{}]: {}".format(item, value), "orange")
performance_msg = "[Performance][{}]: {}".format(item, value)
Utility.console_log(performance_msg, "orange")
# update to junit test report
current_junit_case = TinyFW.JunitReport.get_current_test_case()
current_junit_case.stdout += performance_msg + "\r\n"
def check_performance(item, value):

View File

@ -13,14 +13,12 @@
# limitations under the License.
""" Interface for test cases. """
import sys
import os
import time
import traceback
import inspect
import functools
import xunitgen
import junit_xml
import Env
import DUT
@ -28,11 +26,6 @@ import App
import Utility
XUNIT_FILE_NAME = "XUNIT_RESULT.xml"
XUNIT_RECEIVER = xunitgen.EventReceiver()
XUNIT_DEFAULT_TEST_SUITE = "test-suite"
class DefaultEnvConfig(object):
"""
default test configs. There're 3 places to set configs, priority is (high -> low):
@ -69,40 +62,6 @@ set_default_config = DefaultEnvConfig.set_default_config
get_default_config = DefaultEnvConfig.get_default_config
class TestResult(object):
TEST_RESULT = {
"pass": [],
"fail": [],
}
@classmethod
def get_failed_cases(cls):
"""
:return: failed test cases
"""
return cls.TEST_RESULT["fail"]
@classmethod
def get_passed_cases(cls):
"""
:return: passed test cases
"""
return cls.TEST_RESULT["pass"]
@classmethod
def set_result(cls, result, case_name):
"""
:param result: True or False
:param case_name: test case name
:return: None
"""
cls.TEST_RESULT["pass" if result else "fail"].append(case_name)
get_failed_cases = TestResult.get_failed_cases
get_passed_cases = TestResult.get_passed_cases
MANDATORY_INFO = {
"execution_time": 1,
"env_tag": "default",
@ -111,6 +70,61 @@ MANDATORY_INFO = {
}
class JunitReport(object):
# wrapper for junit test report
# TODO: Don't support by multi-thread (although not likely to be used this way).
JUNIT_FILE_NAME = "XUNIT_RESULT.xml"
JUNIT_DEFAULT_TEST_SUITE = "test-suite"
JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE)
JUNIT_CURRENT_TEST_CASE = None
_TEST_CASE_CREATED_TS = 0
@classmethod
def output_report(cls, junit_file_path):
""" Output current test result to file. """
with open(os.path.join(junit_file_path, cls.JUNIT_FILE_NAME), "w") as f:
cls.JUNIT_TEST_SUITE.to_file(f, [cls.JUNIT_TEST_SUITE], prettyprint=False)
@classmethod
def get_current_test_case(cls):
"""
By default, the test framework will handle junit test report automatically.
While some test case might want to update some info to test report.
They can use this method to get current test case created by test framework.
:return: current junit test case instance created by ``JunitTestReport.create_test_case``
"""
return cls.JUNIT_CURRENT_TEST_CASE
@classmethod
def test_case_finish(cls, test_case):
"""
Append the test case to test suite so it can be output to file.
Execution time will be automatically updated (compared to ``create_test_case``).
"""
test_case.elapsed_sec = time.time() - cls._TEST_CASE_CREATED_TS
cls.JUNIT_TEST_SUITE.test_cases.append(test_case)
@classmethod
def create_test_case(cls, name):
"""
Extend ``junit_xml.TestCase`` with:
1. save create test case so it can be get by ``get_current_test_case``
2. log create timestamp, so ``elapsed_sec`` can be auto updated in ``test_case_finish``.
:param name: test case name
:return: instance of ``junit_xml.TestCase``
"""
# set stdout to empty string, so we can always append string to stdout.
# It won't affect output logic. If stdout is empty, it won't be put to report.
test_case = junit_xml.TestCase(name, stdout="")
cls.JUNIT_CURRENT_TEST_CASE = test_case
cls._TEST_CASE_CREATED_TS = time.time()
return test_case
def test_method(**kwargs):
"""
decorator for test case function.
@ -124,14 +138,15 @@ def test_method(**kwargs):
:keyword env_config_file: test env config file. usually will not set this keyword when define case
:keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result.
usually will not set this keyword when define case
:keyword junit_report_by_case: By default the test fw will handle junit report generation.
In some cases, one test function might test many test cases.
If this flag is set, test case can update junit report by its own.
"""
def test(test_func):
# get test function file name
frame = inspect.stack()
test_func_file_name = frame[1][1]
case_info = MANDATORY_INFO.copy()
case_info["name"] = case_info["ID"] = test_func.__name__
case_info["junit_report_by_case"] = False
case_info.update(kwargs)
@functools.wraps(test_func)
@ -151,11 +166,12 @@ def test_method(**kwargs):
env_config.update(overwrite)
env_inst = Env.Env(**env_config)
# prepare for xunit test results
xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
XUNIT_FILE_NAME)
XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
junit_file_path = env_inst.app_cls.get_log_folder(env_config["test_suite_name"])
junit_test_case = JunitReport.create_test_case(case_info["name"])
result = False
try:
Utility.console_log("starting running test: " + test_func.__name__, color="green")
# execute test function
@ -166,21 +182,20 @@ def test_method(**kwargs):
# handle all the exceptions here
traceback.print_exc()
# log failure
XUNIT_RECEIVER.failure(str(e), test_func_file_name)
junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
finally:
if not case_info["junit_report_by_case"]:
JunitReport.test_case_finish(junit_test_case)
# do close all DUTs, if result is False then print DUT debug info
env_inst.close(dut_debug=(not result))
# end case and output result
XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
with open(xunit_file, "ab+") as f:
f.write(xunitgen.toxml(XUNIT_RECEIVER.results(),
XUNIT_DEFAULT_TEST_SUITE))
JunitReport.output_report(junit_file_path)
if result:
Utility.console_log("Test Succeed: " + test_func.__name__, color="green")
else:
Utility.console_log(("Test Fail: " + test_func.__name__), color="red")
TestResult.set_result(result, test_func.__name__)
return result
handle_test.case_info = case_info

View File

@ -143,6 +143,7 @@ class AssignTest(object):
for job_name in ci_config:
if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
job_list.sort(key=lambda x: x["name"])
return job_list
def _search_cases(self, test_case_path, case_filter=None):

View File

@ -186,7 +186,7 @@ The following 3rd party lib is required:
* pyserial
* pyyaml
* xunitgen
* junit_xml
* netifaces
* matplotlib (if use Utility.LineChart)

View File

@ -1,5 +1,5 @@
pyserial
pyyaml
xunitgen
junit_xml
netifaces
matplotlib

View File

@ -282,7 +282,7 @@ class Parser(object):
config_output_folder = os.path.join(output_folder, config)
if os.path.exists(config_output_folder):
test_cases.extend(self.parse_test_cases_for_one_config(configs_folder, config_output_folder, config))
test_cases.sort(key=lambda x: x["config"] + x["summary"])
self.dump_test_cases(test_cases)

View File

@ -23,7 +23,6 @@ import os
import sys
import time
import argparse
import threading
# if we want to run test case outside `tiny-test-fw` folder,
@ -53,7 +52,7 @@ SIMPLE_TEST_ID = 0
MULTI_STAGE_ID = 1
MULTI_DEVICE_ID = 2
DEFAULT_TIMEOUT=20
DEFAULT_TIMEOUT = 20
DUT_STARTUP_CHECK_RETRY_COUNT = 5
TEST_HISTROY_CHECK_TIMEOUT = 1
@ -132,6 +131,7 @@ def format_test_case_config(test_case_data):
return case_config
def replace_app_bin(dut, name, new_app_bin):
if new_app_bin is None:
return
@ -142,13 +142,15 @@ def replace_app_bin(dut, name, new_app_bin):
Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O")
break
def reset_dut(dut):
dut.reset()
# esptool ``run`` cmd takes quite long time.
# before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
# this could cause checking bootup print failed.
# now use input cmd `-` and check test history to check if DUT is bootup.
# we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
# we'll retry this step for a few times,
# in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
dut.write("-")
try:
@ -157,10 +159,86 @@ def reset_dut(dut):
except ExpectTimeout:
pass
else:
raise AssertationError("Reset {} ({}) failed!".format(dut.name, dut.port))
raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
@IDF.idf_unit_test(env_tag="UT_T1_1")
def run_one_normal_case(dut, one_case, junit_test_case, failed_cases):
reset_dut(dut)
dut.start_capture_raw_data()
# run test case
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
exception_reset_list = []
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
test_finish = list()
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
test_finish.append(True)
output = dut.stop_capture_raw_data()
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
junit_test_case.add_failure_info(output)
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
assert not exception_reset_list
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
junit_test_case.add_skipped_info("ignored")
one_case_finish(not int(data[0]))
def handle_reset_finish(data):
""" reset happened and reboot finished """
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
exception_reset_list)
Utility.console_log(err_msg, color="orange")
junit_test_case.add_error_info(err_msg)
one_case_finish(result)
while not test_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_reset_finish),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
junit_test_case.add_error_info("timeout")
one_case_finish(False)
break
@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
def run_unit_test_cases(env, extra_data):
"""
extra_data can be three types of value
@ -173,6 +251,7 @@ def run_unit_test_cases(env, extra_data):
3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
:param env: test env instance
:param extra_data: the case name or case list or case dictionary
:return: None
"""
@ -190,74 +269,17 @@ def run_unit_test_cases(env, extra_data):
if len(case_config[ut_config]) > 0:
replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin'))
dut.start_app()
Utility.console_log("Download finished, start running test cases", "O")
for one_case in case_config[ut_config]:
reset_dut(dut)
# run test case
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
exception_reset_list = []
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
test_finish = list()
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
test_finish.append(True)
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
assert not exception_reset_list
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
one_case_finish(not int(data[0]))
def handle_reset_finish(data):
""" reset happened and reboot finished """
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
.format(one_case["reset"], exception_reset_list),
color="orange")
one_case_finish(result)
while not test_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_reset_finish),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_case_finish(False)
break
# create junit report test case
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
try:
run_one_normal_case(dut, one_case, junit_test_case, failed_cases)
TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
junit_test_case.add_error_info("Unexpected exception: " + str(e))
TinyFW.JunitReport.test_case_finish(junit_test_case)
# raise exception if any case fails
if failed_cases:
@ -267,7 +289,6 @@ def run_unit_test_cases(env, extra_data):
raise AssertionError("Unit Test Failed")
class Handler(threading.Thread):
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
@ -283,6 +304,7 @@ class Handler(threading.Thread):
self.child_case_index = child_case_index + 1
self.finish = False
self.result = False
self.output = ""
self.fail_name = None
self.timeout = timeout
self.force_stop = threading.Event() # it show the running status
@ -292,6 +314,9 @@ class Handler(threading.Thread):
threading.Thread.__init__(self, name="{} Handler".format(dut))
def run(self):
self.dut.start_capture_raw_data()
def get_child_case_name(data):
self.child_case_name = data[0]
time.sleep(1)
@ -301,6 +326,8 @@ class Handler(threading.Thread):
""" one test finished, let expect loop break and log result """
self.finish = True
self.result = result
self.output = "[{}]\n\n{}\n".format(self.child_case_name,
self.dut.stop_capture_raw_data())
if not result:
self.fail_name = self.child_case_name
@ -309,7 +336,7 @@ class Handler(threading.Thread):
expected_signal = data[0]
while 1:
if time.time() > start_time + self.timeout:
Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange")
break
with self.lock:
if expected_signal in self.sent_signal_list:
@ -330,7 +357,6 @@ class Handler(threading.Thread):
Utility.console_log("Ignored: " + self.child_case_name, color="orange")
one_device_case_finish(not int(data[0]))
try:
time.sleep(1)
self.dut.write("\"{}\"".format(self.parent_case_name))
@ -339,7 +365,8 @@ class Handler(threading.Thread):
Utility.console_log("No case detected!", color="orange")
while not self.finish and not self.force_stop.isSet():
try:
self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'),
get_child_case_name),
(self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
(self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
(self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
@ -366,11 +393,11 @@ def get_dut(duts, env, name, ut_config, app_bin=None):
dut = env.get_dut(name, app_path=ut_config)
duts[name] = dut
replace_app_bin(dut, "unit-test-app", app_bin)
dut.start_app() # download bin to board
dut.start_app() # download bin to board
return dut
def case_run(duts, ut_config, env, one_case, failed_cases, app_bin):
def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, app_bin, junit_test_case):
lock = threading.RLock()
threads = []
send_signal_list = []
@ -384,9 +411,11 @@ def case_run(duts, ut_config, env, one_case, failed_cases, app_bin):
for thread in threads:
thread.setDaemon(True)
thread.start()
output = "Multiple Device Failed\n"
for thread in threads:
thread.join()
result = result and thread.result
output += thread.output
if not thread.result:
[thd.stop() for thd in threads]
@ -394,10 +423,11 @@ def case_run(duts, ut_config, env, one_case, failed_cases, app_bin):
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
junit_test_case.add_failure_info(output)
Utility.console_log("Failed: " + one_case["name"], color="red")
@IDF.idf_unit_test(env_tag="UT_T2_1")
@IDF.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True)
def run_multiple_devices_cases(env, extra_data):
"""
extra_data can be two types of value
@ -421,11 +451,18 @@ def run_multiple_devices_cases(env, extra_data):
"""
failed_cases = []
case_config = format_test_case_config(extra_data)
DUTS = {}
duts = {}
for ut_config in case_config:
Utility.console_log("Running unit test for config: " + ut_config, "O")
for one_case in case_config[ut_config]:
case_run(DUTS, ut_config, env, one_case, failed_cases, one_case.get('app_bin'))
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
try:
run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases,
one_case.get('app_bin'), junit_test_case)
TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
junit_test_case.add_error_info("Unexpected exception: " + str(e))
TinyFW.JunitReport.test_case_finish(junit_test_case)
if failed_cases:
Utility.console_log("Failed Cases:", color="red")
@ -434,7 +471,109 @@ def run_multiple_devices_cases(env, extra_data):
raise AssertionError("Unit Test Failed")
@IDF.idf_unit_test(env_tag="UT_T1_1")
def run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case):
reset_dut(dut)
dut.start_capture_raw_data()
exception_reset_list = []
for test_stage in range(one_case["child case num"]):
# select multi stage test case name
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
# select test function for current stage
dut.write(str(test_stage + 1))
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
stage_finish = list()
def last_stage():
return test_stage == one_case["child case num"] - 1
def check_reset():
if one_case["reset"]:
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
exception_reset_list)
Utility.console_log(err_msg, color="orange")
junit_test_case.add_error_info(err_msg)
else:
# we allow omit reset in multi stage cases
result = True
return result
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
# handle test finish
result = result and check_reset()
output = dut.stop_capture_raw_data()
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
junit_test_case.add_failure_info(output)
stage_finish.append("break")
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
junit_test_case.add_skipped_info("ignored")
# only passed in last stage will be regarded as real pass
if last_stage():
one_case_finish(not int(data[0]))
else:
Utility.console_log("test finished before enter last stage", color="orange")
one_case_finish(False)
def handle_next_stage(data):
""" reboot finished. we goto next stage """
if last_stage():
# already last stage, should never goto next stage
Utility.console_log("didn't finish at last stage", color="orange")
one_case_finish(False)
else:
stage_finish.append("continue")
while not stage_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_next_stage),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_case_finish(False)
break
if stage_finish[0] == "break":
# test breaks on current stage
break
@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
def run_multiple_stage_cases(env, extra_data):
"""
extra_data can be 2 types of value
@ -442,6 +581,7 @@ def run_multiple_stage_cases(env, extra_data):
3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
:param env: test env instance
:param extra_data: the case name or case list or case dictionary
:return: None
"""
@ -461,98 +601,13 @@ def run_multiple_stage_cases(env, extra_data):
dut.start_app()
for one_case in case_config[ut_config]:
reset_dut(dut)
exception_reset_list = []
for test_stage in range(one_case["child case num"]):
# select multi stage test case name
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
# select test function for current stage
dut.write(str(test_stage + 1))
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
stage_finish = list()
def last_stage():
return test_stage == one_case["child case num"] - 1
def check_reset():
if one_case["reset"]:
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
.format(one_case["reset"], exception_reset_list),
color="orange")
else:
# we allow omit reset in multi stage cases
result = True
return result
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
# handle test finish
result = result and check_reset()
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
stage_finish.append("break")
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
# only passed in last stage will be regarded as real pass
if last_stage():
one_case_finish(not int(data[0]))
else:
Utility.console_log("test finished before enter last stage", color="orange")
one_case_finish(False)
def handle_next_stage(data):
""" reboot finished. we goto next stage """
if last_stage():
# already last stage, should never goto next stage
Utility.console_log("didn't finish at last stage", color="orange")
one_case_finish(False)
else:
stage_finish.append("continue")
while not stage_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_next_stage),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_case_finish(False)
break
if stage_finish[0] == "break":
# test breaks on current stage
break
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
try:
run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case)
TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
junit_test_case.add_error_info("Unexpected exception: " + str(e))
TinyFW.JunitReport.test_case_finish(junit_test_case)
# raise exception if any case fails
if failed_cases:
@ -561,6 +616,7 @@ def run_multiple_stage_cases(env, extra_data):
Utility.console_log("\t" + _case_name, color="red")
raise AssertionError("Unit Test Failed")
def detect_update_unit_test_info(env, extra_data, app_bin):
case_config = format_test_case_config(extra_data)
@ -576,14 +632,14 @@ def detect_update_unit_test_info(env, extra_data, app_bin):
dut.write("")
dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
def find_update_dic(name, t, timeout, child_case_num=None):
for dic in extra_data:
if dic['name'] == name:
dic['type'] = t
if 'timeout' not in dic:
dic['timeout'] = timeout
def find_update_dic(name, _t, _timeout, child_case_num=None):
for _case_data in extra_data:
if _case_data['name'] == name:
_case_data['type'] = _t
if 'timeout' not in _case_data:
_case_data['timeout'] = _timeout
if child_case_num:
dic['child case num'] = child_case_num
_case_data['child case num'] = child_case_num
try:
while True:
@ -613,9 +669,9 @@ def detect_update_unit_test_info(env, extra_data, app_bin):
if data[1] and re.search(END_LIST_STR, data[1]):
break
# check if the unit test case names are correct, i.e. they could be found in the device
for dic in extra_data:
if 'type' not in dic:
raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(dic.get('name')))
for _dic in extra_data:
if 'type' not in _dic:
raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
except ExpectTimeout:
Utility.console_log("Timeout during getting the test list", color="red")
finally:
@ -624,6 +680,7 @@ def detect_update_unit_test_info(env, extra_data, app_bin):
# These options are the same for all configs, therefore there is no need to continue
break
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
@ -633,13 +690,13 @@ if __name__ == '__main__':
default=1
)
parser.add_argument("--env_config_file", "-e",
help="test env config file",
default=None
)
help="test env config file",
default=None
)
parser.add_argument("--app_bin", "-b",
help="application binary file for flashing the chip",
default=None
)
help="application binary file for flashing the chip",
default=None
)
parser.add_argument(
'test',
help='Comma separated list of <option>:<argument> where option can be "name" (default), "child case num", \
@ -673,12 +730,12 @@ if __name__ == '__main__':
env_config['app'] = UT
env_config['dut'] = IDF.IDFDUT
env_config['test_suite_name'] = 'unit_test_parsing'
env = Env.Env(**env_config)
detect_update_unit_test_info(env, extra_data=list_of_dicts, app_bin=args.app_bin)
test_env = Env.Env(**env_config)
detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
for i in range(1, args.repeat+1):
for index in range(1, args.repeat+1):
if args.repeat > 1:
Utility.console_log("Repetition {}".format(i), color="green")
Utility.console_log("Repetition {}".format(index), color="green")
for dic in list_of_dicts:
t = dic.get('type', SIMPLE_TEST_ID)
if t == SIMPLE_TEST_ID: