""" Command line tool to assign tests to CI test jobs. """ import argparse import errno import json import os import re from copy import deepcopy import yaml try: from yaml import CLoader as Loader except ImportError: from yaml import Loader as Loader import gitlab_api from tiny_test_fw.Utility import CIAssignTest try: from idf_py_actions.constants import PREVIEW_TARGETS, SUPPORTED_TARGETS except ImportError: SUPPORTED_TARGETS = [] PREVIEW_TARGETS = [] IDF_PATH_FROM_ENV = os.getenv('IDF_PATH', '') class IDFCaseGroup(CIAssignTest.Group): LOCAL_BUILD_DIR = None BUILD_JOB_NAMES = None @classmethod def get_artifact_index_file(cls): assert cls.LOCAL_BUILD_DIR if IDF_PATH_FROM_ENV: artifact_index_file = os.path.join(IDF_PATH_FROM_ENV, cls.LOCAL_BUILD_DIR, 'artifact_index.json') else: artifact_index_file = 'artifact_index.json' return artifact_index_file class IDFAssignTest(CIAssignTest.AssignTest): DEFAULT_FILTER = { 'category': 'function', 'ignore': False, 'supported_in_ci': True, 'nightly_run': False, } def __init__(self, test_case_path, ci_config_file, case_group=IDFCaseGroup): super(IDFAssignTest, self).__init__(test_case_path, ci_config_file, case_group) def format_build_log_path(self, parallel_num): return '{}/list_job_{}.json'.format(self.case_group.LOCAL_BUILD_DIR, parallel_num) def create_artifact_index_file(self, project_id=None, pipeline_id=None): if project_id is None: project_id = os.getenv('CI_PROJECT_ID') if pipeline_id is None: pipeline_id = os.getenv('CI_PIPELINE_ID') gitlab_inst = gitlab_api.Gitlab(project_id) artifact_index_list = [] for build_job_name in self.case_group.BUILD_JOB_NAMES: job_info_list = gitlab_inst.find_job_id(build_job_name, pipeline_id=pipeline_id) for job_info in job_info_list: parallel_num = job_info['parallel_num'] or 1 # Could be None if "parallel_num" not defined for the job raw_data = gitlab_inst.download_artifact(job_info['id'], [self.format_build_log_path(parallel_num)])[0] build_info_list = [json.loads(line) for line in raw_data.decode().splitlines()] for build_info in build_info_list: build_info['ci_job_id'] = job_info['id'] artifact_index_list.append(build_info) artifact_index_file = self.case_group.get_artifact_index_file() try: os.makedirs(os.path.dirname(artifact_index_file)) except OSError as e: if e.errno != errno.EEXIST: raise e with open(artifact_index_file, 'w') as f: json.dump(artifact_index_list, f) def search_cases(self, case_filter=None): _filter = deepcopy(case_filter) if case_filter else {} if 'NIGHTLY_RUN' in os.environ or 'BOT_LABEL_NIGHTLY_RUN' in os.environ: _filter.update({'nightly_run': True}) return super().search_cases(_filter) class ExampleGroup(IDFCaseGroup): SORT_KEYS = CI_JOB_MATCH_KEYS = ['env_tag', 'target'] LOCAL_BUILD_DIR = 'build_examples' EXAMPLE_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS BUILD_JOB_NAMES = ['build_examples_cmake_{}'.format(target) for target in EXAMPLE_TARGETS] class TestAppsGroup(ExampleGroup): LOCAL_BUILD_DIR = 'build_test_apps' TEST_APP_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS BUILD_JOB_NAMES = ['build_test_apps_{}'.format(target) for target in TEST_APP_TARGETS] class ComponentUTGroup(TestAppsGroup): LOCAL_BUILD_DIR = 'build_component_ut' UNIT_TEST_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS BUILD_JOB_NAMES = ['build_component_ut_{}'.format(target) for target in UNIT_TEST_TARGETS] class UnitTestGroup(IDFCaseGroup): SORT_KEYS = ['test environment', 'tags', 'chip_target'] CI_JOB_MATCH_KEYS = ['test environment'] LOCAL_BUILD_DIR = 'tools/unit-test-app/builds' UNIT_TEST_TARGETS = SUPPORTED_TARGETS + PREVIEW_TARGETS BUILD_JOB_NAMES = ['build_esp_idf_tests_cmake_{}'.format(target) for target in UNIT_TEST_TARGETS] MAX_CASE = 50 ATTR_CONVERT_TABLE = { 'execution_time': 'execution time' } DUT_CLS_NAME = { 'esp32': 'ESP32DUT', 'esp32s2': 'ESP32S2DUT', 'esp32c3': 'ESP32C3DUT', 'esp8266': 'ESP8266DUT', } def __init__(self, case): super(UnitTestGroup, self).__init__(case) for tag in self._get_case_attr(case, 'tags'): self.ci_job_match_keys.add(tag) @staticmethod def _get_case_attr(case, attr): if attr in UnitTestGroup.ATTR_CONVERT_TABLE: attr = UnitTestGroup.ATTR_CONVERT_TABLE[attr] return case[attr] def add_extra_case(self, case): """ If current group contains all tags required by case, then add succeed """ added = False if self.accept_new_case(): for key in self.filters: if self._get_case_attr(case, key) != self.filters[key]: if key == 'tags': if set(self._get_case_attr(case, key)).issubset(set(self.filters[key])): continue break else: self.case_list.append(case) added = True return added def _create_extra_data(self, test_cases, test_function): """ For unit test case, we need to copy some attributes of test cases into config file. So unit test function knows how to run the case. """ case_data = [] for case in test_cases: one_case_data = { 'config': self._get_case_attr(case, 'config'), 'name': self._get_case_attr(case, 'summary'), 'reset': self._get_case_attr(case, 'reset'), 'timeout': self._get_case_attr(case, 'timeout'), } if test_function in ['run_multiple_devices_cases', 'run_multiple_stage_cases']: try: one_case_data['child case num'] = self._get_case_attr(case, 'child case num') except KeyError as e: print('multiple devices/stages cases must contains at least two test functions') print('case name: {}'.format(one_case_data['name'])) raise e case_data.append(one_case_data) return case_data def _divide_case_by_test_function(self): """ divide cases of current test group by test function they need to use :return: dict of list of cases for each test functions """ case_by_test_function = { 'run_multiple_devices_cases': [], 'run_multiple_stage_cases': [], 'run_unit_test_cases': [], } for case in self.case_list: if case['multi_device'] == 'Yes': case_by_test_function['run_multiple_devices_cases'].append(case) elif case['multi_stage'] == 'Yes': case_by_test_function['run_multiple_stage_cases'].append(case) else: case_by_test_function['run_unit_test_cases'].append(case) return case_by_test_function def output(self): """ output data for job configs :return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group} """ target = self._get_case_attr(self.case_list[0], 'chip_target') if target: overwrite = { 'dut': { 'package': 'ttfw_idf', 'class': self.DUT_CLS_NAME[target], } } else: overwrite = dict() case_by_test_function = self._divide_case_by_test_function() output_data = { # we don't need filter for test function, as UT uses a few test functions for all cases 'CaseConfig': [ { 'name': test_function, 'extra_data': self._create_extra_data(test_cases, test_function), 'overwrite': overwrite, } for test_function, test_cases in case_by_test_function.items() if test_cases ], } return output_data class ExampleAssignTest(IDFAssignTest): CI_TEST_JOB_PATTERN = re.compile(r'^example_test_.+') def __init__(self, test_case_path, ci_config_file): super(ExampleAssignTest, self).__init__(test_case_path, ci_config_file, case_group=ExampleGroup) class TestAppsAssignTest(IDFAssignTest): CI_TEST_JOB_PATTERN = re.compile(r'^test_app_test_.+') def __init__(self, test_case_path, ci_config_file): super(TestAppsAssignTest, self).__init__(test_case_path, ci_config_file, case_group=TestAppsGroup) class ComponentUTAssignTest(IDFAssignTest): CI_TEST_JOB_PATTERN = re.compile(r'^component_ut_test_.+') def __init__(self, test_case_path, ci_config_file): super(ComponentUTAssignTest, self).__init__(test_case_path, ci_config_file, case_group=ComponentUTGroup) class UnitTestAssignTest(IDFAssignTest): CI_TEST_JOB_PATTERN = re.compile(r'^UT_.+') def __init__(self, test_case_path, ci_config_file): super(UnitTestAssignTest, self).__init__(test_case_path, ci_config_file, case_group=UnitTestGroup) def search_cases(self, case_filter=None): """ For unit test case, we don't search for test functions. The unit test cases is stored in a yaml file which is created in job build-idf-test. """ def find_by_suffix(suffix, path): res = [] for root, _, files in os.walk(path): for file in files: if file.endswith(suffix): res.append(os.path.join(root, file)) return res def get_test_cases_from_yml(yml_file): try: with open(yml_file) as fr: raw_data = yaml.load(fr, Loader=Loader) test_cases = raw_data['test cases'] except (IOError, KeyError): return [] else: return test_cases test_cases = [] for path in self.test_case_paths: if os.path.isdir(path): for yml_file in find_by_suffix('.yml', path): test_cases.extend(get_test_cases_from_yml(yml_file)) elif os.path.isfile(path) and path.endswith('.yml'): test_cases.extend(get_test_cases_from_yml(path)) else: print('Test case path is invalid. Should only happen when use @bot to skip unit test.') # filter keys are lower case. Do map lower case keys with original keys. try: key_mapping = {x.lower(): x for x in test_cases[0].keys()} except IndexError: key_mapping = dict() if case_filter: for key in case_filter: filtered_cases = [] for case in test_cases: try: mapped_key = key_mapping[key] # bot converts string to lower case if isinstance(case[mapped_key], str): _value = case[mapped_key].lower() else: _value = case[mapped_key] if _value in case_filter[key]: filtered_cases.append(case) except KeyError: # case don't have this key, regard as filter success filtered_cases.append(case) test_cases = filtered_cases # sort cases with configs and test functions # in later stage cases with similar attributes are more likely to be assigned to the same job # it will reduce the count of flash DUT operations test_cases.sort(key=lambda x: x['config'] + x['multi_stage'] + x['multi_device']) return test_cases if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('case_group', choices=['example_test', 'custom_test', 'unit_test', 'component_ut']) parser.add_argument('test_case_paths', nargs='+', help='test case folder or file') parser.add_argument('-c', '--config', default=os.path.join(IDF_PATH_FROM_ENV, '.gitlab', 'ci', 'target-test.yml'), help='gitlab ci config file') parser.add_argument('-o', '--output', help='output path of config files') parser.add_argument('--pipeline_id', '-p', type=int, default=None, help='pipeline_id') parser.add_argument('--test-case-file-pattern', help='file name pattern used to find Python test case files') args = parser.parse_args() SUPPORTED_TARGETS.extend(PREVIEW_TARGETS) test_case_paths = [os.path.join(IDF_PATH_FROM_ENV, path) if not os.path.isabs(path) else path for path in args.test_case_paths] # type: ignore args_list = [test_case_paths, args.config] if args.case_group == 'example_test': assigner = ExampleAssignTest(*args_list) elif args.case_group == 'custom_test': assigner = TestAppsAssignTest(*args_list) elif args.case_group == 'unit_test': assigner = UnitTestAssignTest(*args_list) elif args.case_group == 'component_ut': assigner = ComponentUTAssignTest(*args_list) else: raise SystemExit(1) # which is impossible if args.test_case_file_pattern: assigner.CI_TEST_JOB_PATTERN = re.compile(r'{}'.format(args.test_case_file_pattern)) assigner.assign_cases() assigner.output_configs(args.output) assigner.create_artifact_index_file()