2023-07-31 00:49:08 -04:00
|
|
|
# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
|
|
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
|
2023-06-19 23:10:47 -04:00
|
|
|
import logging
|
2023-07-31 00:49:08 -04:00
|
|
|
import os
|
|
|
|
import typing as t
|
|
|
|
from fnmatch import fnmatch
|
|
|
|
from xml.etree import ElementTree as ET
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
from _pytest.config import ExitCode
|
|
|
|
from _pytest.main import Session
|
|
|
|
from _pytest.python import Function
|
|
|
|
from _pytest.reports import TestReport
|
|
|
|
from _pytest.runner import CallInfo
|
|
|
|
from _pytest.terminal import TerminalReporter
|
|
|
|
from pytest_embedded.plugin import parse_multi_dut_args
|
|
|
|
from pytest_embedded.utils import find_by_suffix, to_list
|
|
|
|
|
|
|
|
from .constants import DEFAULT_SDKCONFIG, PREVIEW_TARGETS, SUPPORTED_TARGETS, PytestApp, PytestCase
|
2023-06-13 05:12:55 -04:00
|
|
|
from .utils import format_case_id, merge_junit_files
|
2023-07-31 00:49:08 -04:00
|
|
|
|
|
|
|
IDF_PYTEST_EMBEDDED_KEY = pytest.StashKey['IdfPytestEmbedded']()
|
|
|
|
ITEM_FAILED_CASES_KEY = pytest.StashKey[list]()
|
|
|
|
ITEM_FAILED_KEY = pytest.StashKey[bool]()
|
|
|
|
|
|
|
|
|
|
|
|
class IdfPytestEmbedded:
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
target: str,
|
|
|
|
sdkconfig: t.Optional[str] = None,
|
|
|
|
known_failure_cases_file: t.Optional[str] = None,
|
|
|
|
apps_list: t.Optional[t.List[str]] = None,
|
|
|
|
):
|
|
|
|
# CLI options to filter the test cases
|
|
|
|
self.target = target.lower()
|
|
|
|
self.sdkconfig = sdkconfig
|
|
|
|
self.known_failure_patterns = self._parse_known_failure_cases_file(known_failure_cases_file)
|
|
|
|
self.apps_list = apps_list
|
|
|
|
|
2023-06-13 05:12:55 -04:00
|
|
|
self.cases: t.List[PytestCase] = []
|
|
|
|
|
2023-07-31 00:49:08 -04:00
|
|
|
self._failed_cases: t.List[t.Tuple[str, bool, bool]] = [] # (test_case_name, is_known_failure_cases, is_xfail)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def failed_cases(self) -> t.List[str]:
|
|
|
|
return [case for case, is_known, is_xfail in self._failed_cases if not is_known and not is_xfail]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def known_failure_cases(self) -> t.List[str]:
|
|
|
|
return [case for case, is_known, _ in self._failed_cases if is_known]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def xfail_cases(self) -> t.List[str]:
|
|
|
|
return [case for case, _, is_xfail in self._failed_cases if is_xfail]
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _parse_known_failure_cases_file(
|
|
|
|
known_failure_cases_file: t.Optional[str] = None,
|
|
|
|
) -> t.List[str]:
|
|
|
|
if not known_failure_cases_file or not os.path.isfile(known_failure_cases_file):
|
|
|
|
return []
|
|
|
|
|
|
|
|
patterns = []
|
|
|
|
with open(known_failure_cases_file) as fr:
|
|
|
|
for line in fr.readlines():
|
|
|
|
if not line:
|
|
|
|
continue
|
|
|
|
if not line.strip():
|
|
|
|
continue
|
|
|
|
without_comments = line.split('#')[0].strip()
|
|
|
|
if without_comments:
|
|
|
|
patterns.append(without_comments)
|
|
|
|
|
|
|
|
return patterns
|
|
|
|
|
2023-06-13 05:12:55 -04:00
|
|
|
@staticmethod
|
|
|
|
def get_param(item: Function, key: str, default: t.Any = None) -> t.Any:
|
|
|
|
# implement like this since this is a limitation of pytest, couldn't get fixture values while collecting
|
|
|
|
# https://github.com/pytest-dev/pytest/discussions/9689
|
|
|
|
if not hasattr(item, 'callspec'):
|
|
|
|
raise ValueError(f'Function {item} does not have params')
|
|
|
|
|
|
|
|
return item.callspec.params.get(key, default) or default
|
|
|
|
|
|
|
|
def item_to_pytest_case(self, item: Function) -> PytestCase:
|
|
|
|
count = 1
|
|
|
|
case_path = str(item.path)
|
|
|
|
case_name = item.originalname
|
|
|
|
target = self.target
|
|
|
|
|
|
|
|
# funcargs is not calculated while collection
|
|
|
|
if hasattr(item, 'callspec'):
|
|
|
|
count = item.callspec.params.get('count', 1)
|
|
|
|
app_paths = to_list(
|
|
|
|
parse_multi_dut_args(
|
|
|
|
count,
|
|
|
|
self.get_param(item, 'app_path', os.path.dirname(case_path)),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
configs = to_list(parse_multi_dut_args(count, self.get_param(item, 'config', 'default')))
|
|
|
|
targets = to_list(parse_multi_dut_args(count, self.get_param(item, 'target', target)))
|
|
|
|
else:
|
|
|
|
app_paths = [os.path.dirname(case_path)]
|
|
|
|
configs = ['default']
|
|
|
|
targets = [target]
|
|
|
|
|
|
|
|
case_apps = set()
|
|
|
|
for i in range(count):
|
|
|
|
case_apps.add(PytestApp(app_paths[i], targets[i], configs[i]))
|
|
|
|
|
|
|
|
return PytestCase(
|
|
|
|
case_path,
|
|
|
|
case_name,
|
|
|
|
case_apps,
|
|
|
|
self.target,
|
|
|
|
item,
|
|
|
|
)
|
|
|
|
|
2023-07-31 00:49:08 -04:00
|
|
|
@pytest.hookimpl(tryfirst=True)
|
|
|
|
def pytest_sessionstart(self, session: Session) -> None:
|
|
|
|
# same behavior for vanilla pytest-embedded '--target'
|
|
|
|
session.config.option.target = self.target
|
|
|
|
|
|
|
|
@pytest.hookimpl(tryfirst=True)
|
|
|
|
def pytest_collection_modifyitems(self, items: t.List[Function]) -> None:
|
2023-06-13 05:12:55 -04:00
|
|
|
item_to_case: t.Dict[Function, PytestCase] = {}
|
2023-06-19 23:10:47 -04:00
|
|
|
|
|
|
|
# Add Markers to the test cases
|
2023-07-31 00:49:08 -04:00
|
|
|
for item in items:
|
2023-06-13 05:12:55 -04:00
|
|
|
# generate PytestCase for each item
|
|
|
|
case = self.item_to_pytest_case(item)
|
|
|
|
item_to_case[item] = case
|
|
|
|
|
|
|
|
# set default timeout 10 minutes for each case
|
2023-07-31 00:49:08 -04:00
|
|
|
if 'timeout' not in item.keywords:
|
|
|
|
item.add_marker(pytest.mark.timeout(10 * 60))
|
|
|
|
|
2023-06-13 05:12:55 -04:00
|
|
|
# add markers for special markers
|
2023-07-31 00:49:08 -04:00
|
|
|
if 'supported_targets' in item.keywords:
|
|
|
|
for _target in SUPPORTED_TARGETS:
|
|
|
|
item.add_marker(_target)
|
|
|
|
if 'preview_targets' in item.keywords:
|
|
|
|
for _target in PREVIEW_TARGETS:
|
|
|
|
item.add_marker(_target)
|
|
|
|
if 'all_targets' in item.keywords:
|
|
|
|
for _target in [*SUPPORTED_TARGETS, *PREVIEW_TARGETS]:
|
|
|
|
item.add_marker(_target)
|
|
|
|
|
|
|
|
# add 'xtal_40mhz' tag as a default tag for esp32c2 target
|
|
|
|
# only add this marker for esp32c2 cases
|
2023-06-13 05:12:55 -04:00
|
|
|
if self.target == 'esp32c2' and 'esp32c2' in case.target_markers and 'xtal_26mhz' not in case.all_markers:
|
2023-07-31 00:49:08 -04:00
|
|
|
item.add_marker('xtal_40mhz')
|
|
|
|
|
2023-06-19 23:10:47 -04:00
|
|
|
# Filter the test cases
|
|
|
|
filtered_items = []
|
|
|
|
for item in items:
|
|
|
|
case = item_to_case[item]
|
|
|
|
# filter by "nightly_run" marker
|
|
|
|
if os.getenv('INCLUDE_NIGHTLY_RUN') == '1':
|
|
|
|
# Do not filter nightly_run cases
|
|
|
|
pass
|
|
|
|
elif os.getenv('NIGHTLY_RUN') == '1':
|
|
|
|
if not case.is_nightly_run:
|
|
|
|
logging.debug(
|
|
|
|
'Skipping test case %s because of this test case is not a nightly run test case', item.name
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
if case.is_nightly_run:
|
|
|
|
logging.debug(
|
|
|
|
'Skipping test case %s because of this test case is a nightly run test case', item.name
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
|
|
|
|
# filter by target
|
|
|
|
if self.target not in case.target_markers:
|
|
|
|
continue
|
|
|
|
|
|
|
|
if self.target in case.skipped_targets:
|
|
|
|
continue
|
|
|
|
|
|
|
|
# filter by sdkconfig
|
|
|
|
if self.sdkconfig:
|
|
|
|
if self.get_param(item, 'config', DEFAULT_SDKCONFIG) != self.sdkconfig:
|
|
|
|
continue
|
|
|
|
|
|
|
|
# filter by apps_list, skip the test case if not listed
|
|
|
|
# should only be used in CI
|
|
|
|
if self.apps_list is not None:
|
|
|
|
bin_not_found = False
|
|
|
|
for case_app in case.apps:
|
|
|
|
# in ci, always use build_<target>_<config> as build dir
|
|
|
|
binary_path = os.path.join(case_app.path, f'build_{case_app.target}_{case_app.config}')
|
|
|
|
if binary_path not in self.apps_list:
|
|
|
|
logging.info(
|
|
|
|
'Skipping test case %s because binary path %s is not listed in app info list files',
|
|
|
|
item.name,
|
|
|
|
binary_path,
|
|
|
|
)
|
|
|
|
bin_not_found = True
|
|
|
|
break
|
|
|
|
|
|
|
|
if bin_not_found:
|
|
|
|
continue
|
|
|
|
|
|
|
|
# finally!
|
|
|
|
filtered_items.append(item)
|
|
|
|
|
|
|
|
items[:] = filtered_items[:]
|
2023-06-13 05:12:55 -04:00
|
|
|
|
|
|
|
def pytest_report_collectionfinish(self, items: t.List[Function]) -> None:
|
|
|
|
for item in items:
|
|
|
|
self.cases.append(self.item_to_pytest_case(item))
|
2023-07-31 00:49:08 -04:00
|
|
|
|
|
|
|
def pytest_runtest_makereport(self, item: Function, call: CallInfo[None]) -> t.Optional[TestReport]:
|
|
|
|
report = TestReport.from_item_and_call(item, call)
|
|
|
|
if item.stash.get(ITEM_FAILED_KEY, None) is None:
|
|
|
|
item.stash[ITEM_FAILED_KEY] = False
|
|
|
|
|
|
|
|
if report.outcome == 'failed':
|
|
|
|
# Mark the failed test cases
|
|
|
|
#
|
|
|
|
# This hook function would be called in 3 phases, setup, call, teardown.
|
|
|
|
# the report.outcome is the outcome of the single call of current phase, which is independent
|
|
|
|
# the call phase outcome is the test result
|
|
|
|
item.stash[ITEM_FAILED_KEY] = True
|
|
|
|
|
|
|
|
if call.when == 'teardown':
|
|
|
|
item_failed = item.stash[ITEM_FAILED_KEY]
|
|
|
|
if item_failed:
|
|
|
|
# unity real test cases
|
|
|
|
failed_sub_cases = item.stash.get(ITEM_FAILED_CASES_KEY, [])
|
|
|
|
if failed_sub_cases:
|
|
|
|
for test_case_name in failed_sub_cases:
|
|
|
|
self._failed_cases.append((test_case_name, self._is_known_failure(test_case_name), False))
|
|
|
|
else: # the case iteself is failing
|
|
|
|
test_case_name = item.funcargs.get('test_case_name', '')
|
|
|
|
if test_case_name:
|
|
|
|
self._failed_cases.append(
|
|
|
|
(
|
|
|
|
test_case_name,
|
|
|
|
self._is_known_failure(test_case_name),
|
|
|
|
report.keywords.get('xfail', False),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
return report
|
|
|
|
|
|
|
|
def _is_known_failure(self, case_id: str) -> bool:
|
|
|
|
for pattern in self.known_failure_patterns:
|
|
|
|
if case_id == pattern:
|
|
|
|
return True
|
|
|
|
if fnmatch(case_id, pattern):
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
@pytest.hookimpl(trylast=True)
|
|
|
|
def pytest_runtest_teardown(self, item: Function) -> None:
|
|
|
|
"""
|
|
|
|
Format the test case generated junit reports
|
|
|
|
"""
|
|
|
|
tempdir = item.funcargs.get('test_case_tempdir')
|
|
|
|
if not tempdir:
|
|
|
|
return
|
|
|
|
|
|
|
|
junits = find_by_suffix('.xml', tempdir)
|
|
|
|
if not junits:
|
|
|
|
return
|
|
|
|
|
|
|
|
if len(junits) > 1:
|
|
|
|
merge_junit_files(junits, os.path.join(tempdir, 'dut.xml'))
|
|
|
|
junits = [os.path.join(tempdir, 'dut.xml')]
|
|
|
|
|
|
|
|
is_qemu = item.get_closest_marker('qemu') is not None
|
|
|
|
failed_sub_cases = []
|
|
|
|
target = item.funcargs['target']
|
|
|
|
config = item.funcargs['config']
|
|
|
|
for junit in junits:
|
|
|
|
xml = ET.parse(junit)
|
|
|
|
testcases = xml.findall('.//testcase')
|
|
|
|
for case in testcases:
|
|
|
|
# modify the junit files
|
|
|
|
new_case_name = format_case_id(target, config, case.attrib['name'], is_qemu=is_qemu)
|
|
|
|
case.attrib['name'] = new_case_name
|
|
|
|
if 'file' in case.attrib:
|
|
|
|
case.attrib['file'] = case.attrib['file'].replace('/IDF/', '') # our unity test framework
|
|
|
|
|
|
|
|
# collect real failure cases
|
|
|
|
if case.find('failure') is not None:
|
|
|
|
failed_sub_cases.append(new_case_name)
|
|
|
|
|
|
|
|
xml.write(junit)
|
|
|
|
|
|
|
|
item.stash[ITEM_FAILED_CASES_KEY] = failed_sub_cases
|
|
|
|
|
|
|
|
def pytest_sessionfinish(self, session: Session, exitstatus: int) -> None:
|
|
|
|
if exitstatus != 0:
|
|
|
|
if exitstatus == ExitCode.NO_TESTS_COLLECTED:
|
|
|
|
session.exitstatus = 0
|
|
|
|
elif self.known_failure_cases and not self.failed_cases:
|
|
|
|
session.exitstatus = 0
|
|
|
|
|
|
|
|
def pytest_terminal_summary(self, terminalreporter: TerminalReporter) -> None:
|
|
|
|
if self.known_failure_cases:
|
|
|
|
terminalreporter.section('Known failure cases', bold=True, yellow=True)
|
|
|
|
terminalreporter.line('\n'.join(self.known_failure_cases))
|
|
|
|
|
|
|
|
if self.xfail_cases:
|
|
|
|
terminalreporter.section('xfail cases', bold=True, yellow=True)
|
|
|
|
terminalreporter.line('\n'.join(self.xfail_cases))
|
|
|
|
|
|
|
|
if self.failed_cases:
|
|
|
|
terminalreporter.section('Failed cases', bold=True, red=True)
|
|
|
|
terminalreporter.line('\n'.join(self.failed_cases))
|