# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 import os import typing as t from functools import cached_property from xml.etree import ElementTree as ET import pytest from _pytest.config import ExitCode from _pytest.main import Session from _pytest.python import Function from _pytest.runner import CallInfo from pytest_embedded import Dut from pytest_embedded.plugin import parse_multi_dut_args from pytest_embedded.utils import find_by_suffix, to_list from pytest_ignore_test_results.ignore_results import ChildCase, ChildCasesStashKey from .constants import DEFAULT_SDKCONFIG, PREVIEW_TARGETS, SUPPORTED_TARGETS, CollectMode, PytestApp, PytestCase from .utils import comma_sep_str_to_list, format_case_id, merge_junit_files IDF_PYTEST_EMBEDDED_KEY = pytest.StashKey['IdfPytestEmbedded']() ITEM_FAILED_CASES_KEY = pytest.StashKey[list]() ITEM_FAILED_KEY = pytest.StashKey[bool]() ITEM_PYTEST_CASE_KEY = pytest.StashKey[PytestCase]() class IdfPytestEmbedded: UNITY_RESULT_MAPPINGS = { 'PASS': 'passed', 'FAIL': 'failed', 'IGNORE': 'skipped', } def __init__( self, target: t.Union[t.List[str], str], *, single_target_duplicate_mode: bool = False, apps_list: t.Optional[t.List[str]] = None, ): if isinstance(target, str): self.target = sorted(comma_sep_str_to_list(target)) else: self.target = sorted(target) if not self.target: raise ValueError('`target` should not be empty') # these are useful while gathering all the multi-dut test cases # when this mode is activated, # # pytest.mark.esp32 # pytest.mark.parametrize('count', [2], indirect=True) # def test_foo(dut): # pass # # should be collected when running `pytest --target esp32` # # otherwise, it should be collected when running `pytest --target esp32,esp32` self._single_target_duplicate_mode = single_target_duplicate_mode self.apps_list = apps_list self.cases: t.List[PytestCase] = [] @cached_property def collect_mode(self) -> CollectMode: if len(self.target) == 1: if self.target[0] == CollectMode.MULTI_ALL_WITH_PARAM: return CollectMode.MULTI_ALL_WITH_PARAM else: return CollectMode.SINGLE_SPECIFIC else: return CollectMode.MULTI_SPECIFIC @staticmethod def get_param(item: Function, key: str, default: t.Any = None) -> t.Any: # funcargs is not calculated while collection # callspec is something defined in parametrize if not hasattr(item, 'callspec'): return default return item.callspec.params.get(key, default) or default def item_to_pytest_case(self, item: Function) -> PytestCase: """ Turn pytest item to PytestCase """ count = self.get_param(item, 'count', 1) # default app_path is where the test script locates app_paths = to_list( parse_multi_dut_args(count, os.path.relpath(self.get_param(item, 'app_path', os.path.dirname(item.path)))) ) configs = to_list(parse_multi_dut_args(count, self.get_param(item, 'config', DEFAULT_SDKCONFIG))) targets = to_list(parse_multi_dut_args(count, self.get_param(item, 'target', self.target[0]))) return PytestCase([PytestApp(app_paths[i], targets[i], configs[i]) for i in range(count)], item) @pytest.hookimpl(tryfirst=True) def pytest_collection_modifyitems(self, items: t.List[Function]) -> None: """ Background info: We're using `pytest.mark.[TARGET]` as a syntactic sugar to indicate that they are actually supported by all the listed targets. For example, >>> @pytest.mark.esp32 >>> @pytest.mark.esp32s2 should be treated as >>> @pytest.mark.parametrize('target', [ >>> 'esp32', >>> 'esp32s2', >>> ], indirect=True) All single-dut test cases, and some of the multi-dut test cases with the same targets, are using this way to indicate the supported targets. To avoid ambiguity, - when we're collecting single-dut test cases with esp32, we call `pytest --collect-only --target esp32` - when we're collecting multi-dut test cases, we list all the targets, even when they're the same `pytest --collect-only --target esp32,esp32` for two esp32 connected `pytest --collect-only --target esp32,esp32s2` for esp32 and esp32s2 connected therefore, we have two different logic for searching test cases, explained in 2.1 and 2.2 """ # 1. Filter according to nighty_run related markers if os.getenv('INCLUDE_NIGHTLY_RUN') == '1': # nightly_run and non-nightly_run cases are both included pass elif os.getenv('NIGHTLY_RUN') == '1': # only nightly_run cases are included items[:] = [_item for _item in items if _item.get_closest_marker('nightly_run') is not None] else: # only non-nightly_run cases are included items[:] = [_item for _item in items if _item.get_closest_marker('nightly_run') is None] # 2. Add markers according to special markers item_to_case_dict: t.Dict[Function, PytestCase] = {} for item in items: item.stash[ITEM_PYTEST_CASE_KEY] = item_to_case_dict[item] = self.item_to_pytest_case(item) if 'supported_targets' in item.keywords: for _target in SUPPORTED_TARGETS: item.add_marker(_target) if 'preview_targets' in item.keywords: for _target in PREVIEW_TARGETS: item.add_marker(_target) if 'all_targets' in item.keywords: for _target in [*SUPPORTED_TARGETS, *PREVIEW_TARGETS]: item.add_marker(_target) # 3.1. CollectMode.SINGLE_SPECIFIC, like `pytest --target esp32` if self.collect_mode == CollectMode.SINGLE_SPECIFIC: filtered_items = [] for item in items: case = item_to_case_dict[item] # single-dut one if case.is_single_dut_test_case and self.target[0] in case.target_markers: filtered_items.append(item) # multi-dut ones and in single_target_duplicate_mode elif self._single_target_duplicate_mode and not case.is_single_dut_test_case: # ignore those test cases with `target` defined in parametrize, since these will be covered in 3.3 if self.get_param(item, 'target', None) is None and self.target[0] in case.target_markers: filtered_items.append(item) items[:] = filtered_items # 3.2. CollectMode.MULTI_SPECIFIC, like `pytest --target esp32,esp32` elif self.collect_mode == CollectMode.MULTI_SPECIFIC: items[:] = [_item for _item in items if item_to_case_dict[_item].targets == self.target] # 3.3. CollectMode.MULTI_ALL_WITH_PARAM, intended to be used by `get_pytest_cases` else: items[:] = [ _item for _item in items if not item_to_case_dict[_item].is_single_dut_test_case and self.get_param(_item, 'target', None) is not None ] # 4. filter by `self.apps_list`, skip the test case if not listed # should only be used in CI items[:] = [_item for _item in items if item_to_case_dict[_item].all_built_in_app_lists(self.apps_list)] # OKAY!!! All left ones will be executed, sort it and add more markers items[:] = sorted( items, key=lambda x: (os.path.dirname(x.path), self.get_param(x, 'config', DEFAULT_SDKCONFIG)) ) for item in items: case = item_to_case_dict[item] # set default timeout 10 minutes for each case if 'timeout' not in item.keywords: item.add_marker(pytest.mark.timeout(10 * 60)) # add 'xtal_40mhz' tag as a default tag for esp32c2 target # only add this marker for esp32c2 cases if 'esp32c2' in self.target and 'esp32c2' in case.targets and 'xtal_26mhz' not in case.all_markers: item.add_marker('xtal_40mhz') def pytest_report_collectionfinish(self, items: t.List[Function]) -> None: for item in items: self.cases.append(self.item_to_pytest_case(item)) def pytest_custom_test_case_name(self, item: Function) -> str: return item.funcargs.get('test_case_name', item.nodeid) # type: ignore def pytest_runtest_makereport(self, item: Function, call: CallInfo[None]) -> None: if call.when == 'call': target = item.funcargs['target'] config = item.funcargs['config'] is_qemu = item.get_closest_marker('qemu') is not None dut: t.Union[Dut, t.Tuple[Dut]] = item.funcargs['dut'] # type: ignore if isinstance(dut, (list, tuple)): res = [] for i, _dut in enumerate(dut): res.extend( [ ChildCase( format_case_id(target, config, case.name + f' {i}', is_qemu=is_qemu), self.UNITY_RESULT_MAPPINGS[case.result], ) for case in _dut.testsuite.testcases ] ) item.config.stash[ChildCasesStashKey] = {item.nodeid: res} else: item.config.stash[ChildCasesStashKey] = { item.nodeid: [ ChildCase( format_case_id(target, config, case.name, is_qemu=is_qemu), self.UNITY_RESULT_MAPPINGS[case.result], ) for case in dut.testsuite.testcases ] } @pytest.hookimpl(trylast=True) def pytest_runtest_teardown(self, item: Function) -> None: """ Modify the junit reports. Format the unity c test case names. """ tempdir: t.Optional[str] = item.funcargs.get('test_case_tempdir') # type: ignore if not tempdir: return junits = find_by_suffix('.xml', tempdir) if not junits: return if len(junits) > 1: merge_junit_files(junits, os.path.join(tempdir, 'dut.xml')) junits = [os.path.join(tempdir, 'dut.xml')] # unity cases is_qemu = item.get_closest_marker('qemu') is not None target = item.funcargs['target'] config = item.funcargs['config'] for junit in junits: xml = ET.parse(junit) testcases = xml.findall('.//testcase') for case in testcases: # modify the junit files new_case_name = format_case_id(target, config, case.attrib['name'], is_qemu=is_qemu) case.attrib['name'] = new_case_name if 'file' in case.attrib: case.attrib['file'] = case.attrib['file'].replace('/IDF/', '') # our unity test framework xml.write(junit) def pytest_sessionfinish(self, session: Session, exitstatus: int) -> None: if exitstatus != 0: if exitstatus == ExitCode.NO_TESTS_COLLECTED: session.exitstatus = 0