2018-01-31 18:59:10 +08:00
|
|
|
"""
|
|
|
|
Command line tool to assign unit tests to CI test jobs.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import re
|
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
import argparse
|
|
|
|
|
|
|
|
import yaml
|
|
|
|
|
2018-12-04 13:46:48 +01:00
|
|
|
try:
|
|
|
|
from Utility import CIAssignTest
|
|
|
|
except ImportError:
|
|
|
|
test_fw_path = os.getenv("TEST_FW_PATH")
|
|
|
|
if test_fw_path:
|
|
|
|
sys.path.insert(0, test_fw_path)
|
|
|
|
from Utility import CIAssignTest
|
2018-01-31 18:59:10 +08:00
|
|
|
|
|
|
|
|
|
|
|
class Group(CIAssignTest.Group):
|
2019-11-21 17:33:32 +08:00
|
|
|
SORT_KEYS = ["test environment", "tags"]
|
|
|
|
MAX_CASE = 50
|
2018-01-31 18:59:10 +08:00
|
|
|
ATTR_CONVERT_TABLE = {
|
|
|
|
"execution_time": "execution time"
|
|
|
|
}
|
2018-05-15 11:51:56 +08:00
|
|
|
# when IDF support multiple chips, SDK will be moved into tags, we can remove it
|
|
|
|
CI_JOB_MATCH_KEYS = ["test environment", "SDK"]
|
|
|
|
|
|
|
|
def __init__(self, case):
|
|
|
|
super(Group, self).__init__(case)
|
|
|
|
for tag in self._get_case_attr(case, "tags"):
|
|
|
|
self.ci_job_match_keys.add(tag)
|
2018-01-31 18:59:10 +08:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _get_case_attr(case, attr):
|
|
|
|
if attr in Group.ATTR_CONVERT_TABLE:
|
|
|
|
attr = Group.ATTR_CONVERT_TABLE[attr]
|
|
|
|
return case[attr]
|
|
|
|
|
2019-11-21 17:33:32 +08:00
|
|
|
def _create_extra_data(self, test_cases, test_function):
|
2018-05-21 14:50:27 +10:00
|
|
|
"""
|
|
|
|
For unit test case, we need to copy some attributes of test cases into config file.
|
|
|
|
So unit test function knows how to run the case.
|
|
|
|
"""
|
2018-01-31 18:59:10 +08:00
|
|
|
case_data = []
|
2019-11-21 17:33:32 +08:00
|
|
|
for case in test_cases:
|
2018-03-09 14:50:34 +08:00
|
|
|
one_case_data = {
|
2018-05-15 11:51:56 +08:00
|
|
|
"config": self._get_case_attr(case, "config"),
|
2018-03-09 14:50:34 +08:00
|
|
|
"name": self._get_case_attr(case, "summary"),
|
|
|
|
"reset": self._get_case_attr(case, "reset"),
|
2018-05-21 14:50:27 +10:00
|
|
|
"timeout": self._get_case_attr(case, "timeout"),
|
2018-03-09 14:50:34 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
if test_function in ["run_multiple_devices_cases", "run_multiple_stage_cases"]:
|
|
|
|
try:
|
|
|
|
one_case_data["child case num"] = self._get_case_attr(case, "child case num")
|
|
|
|
except KeyError as e:
|
|
|
|
print("multiple devices/stages cases must contains at least two test functions")
|
|
|
|
print("case name: {}".format(one_case_data["name"]))
|
|
|
|
raise e
|
|
|
|
|
|
|
|
case_data.append(one_case_data)
|
2018-01-31 18:59:10 +08:00
|
|
|
return case_data
|
|
|
|
|
2019-11-21 17:33:32 +08:00
|
|
|
def _divide_case_by_test_function(self):
|
2018-03-09 14:50:34 +08:00
|
|
|
"""
|
2019-11-21 17:33:32 +08:00
|
|
|
divide cases of current test group by test function they need to use
|
2018-03-09 14:50:34 +08:00
|
|
|
|
2019-11-21 17:33:32 +08:00
|
|
|
:return: dict of list of cases for each test functions
|
2018-03-09 14:50:34 +08:00
|
|
|
"""
|
2019-11-21 17:33:32 +08:00
|
|
|
case_by_test_function = {
|
|
|
|
"run_multiple_devices_cases": [],
|
|
|
|
"run_multiple_stage_cases": [],
|
|
|
|
"run_unit_test_cases": [],
|
|
|
|
}
|
|
|
|
|
|
|
|
for case in self.case_list:
|
|
|
|
if case["multi_device"] == "Yes":
|
|
|
|
case_by_test_function["run_multiple_devices_cases"].append(case)
|
|
|
|
elif case["multi_stage"] == "Yes":
|
|
|
|
case_by_test_function["run_multiple_stage_cases"].append(case)
|
|
|
|
else:
|
|
|
|
case_by_test_function["run_unit_test_cases"].append(case)
|
|
|
|
return case_by_test_function
|
2018-03-09 14:50:34 +08:00
|
|
|
|
2018-01-31 18:59:10 +08:00
|
|
|
def output(self):
|
|
|
|
"""
|
|
|
|
output data for job configs
|
|
|
|
|
|
|
|
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
|
|
|
|
"""
|
2019-11-21 17:33:32 +08:00
|
|
|
case_by_test_function = self._divide_case_by_test_function()
|
|
|
|
|
2018-01-31 18:59:10 +08:00
|
|
|
output_data = {
|
|
|
|
# we don't need filter for test function, as UT uses a few test functions for all cases
|
|
|
|
"CaseConfig": [
|
|
|
|
{
|
2018-03-09 14:50:34 +08:00
|
|
|
"name": test_function,
|
2019-11-21 17:33:32 +08:00
|
|
|
"extra_data": self._create_extra_data(test_cases, test_function),
|
|
|
|
} for test_function, test_cases in case_by_test_function.iteritems() if test_cases
|
|
|
|
],
|
2018-01-31 18:59:10 +08:00
|
|
|
}
|
|
|
|
return output_data
|
|
|
|
|
|
|
|
|
|
|
|
class UnitTestAssignTest(CIAssignTest.AssignTest):
|
|
|
|
CI_TEST_JOB_PATTERN = re.compile(r"^UT_.+")
|
|
|
|
|
|
|
|
def __init__(self, test_case_path, ci_config_file):
|
|
|
|
CIAssignTest.AssignTest.__init__(self, test_case_path, ci_config_file, case_group=Group)
|
|
|
|
|
2018-05-15 11:51:56 +08:00
|
|
|
def _search_cases(self, test_case_path, case_filter=None):
|
2018-01-31 18:59:10 +08:00
|
|
|
"""
|
|
|
|
For unit test case, we don't search for test functions.
|
|
|
|
The unit test cases is stored in a yaml file which is created in job build-idf-test.
|
|
|
|
"""
|
|
|
|
|
2018-09-29 14:51:43 +08:00
|
|
|
try:
|
|
|
|
with open(test_case_path, "r") as f:
|
|
|
|
raw_data = yaml.load(f)
|
|
|
|
test_cases = raw_data["test cases"]
|
|
|
|
except IOError:
|
|
|
|
print("Test case path is invalid. Should only happen when use @bot to skip unit test.")
|
|
|
|
test_cases = []
|
2018-07-03 22:00:09 +08:00
|
|
|
# filter keys are lower case. Do map lower case keys with original keys.
|
|
|
|
try:
|
|
|
|
key_mapping = {x.lower(): x for x in test_cases[0].keys()}
|
|
|
|
except IndexError:
|
|
|
|
key_mapping = dict()
|
2018-01-31 18:59:10 +08:00
|
|
|
if case_filter:
|
|
|
|
for key in case_filter:
|
|
|
|
filtered_cases = []
|
|
|
|
for case in test_cases:
|
|
|
|
try:
|
2018-07-03 22:00:09 +08:00
|
|
|
mapped_key = key_mapping[key]
|
2018-01-31 18:59:10 +08:00
|
|
|
# bot converts string to lower case
|
2018-07-03 22:00:09 +08:00
|
|
|
if isinstance(case[mapped_key], str):
|
|
|
|
_value = case[mapped_key].lower()
|
2018-01-31 18:59:10 +08:00
|
|
|
else:
|
2018-07-03 22:00:09 +08:00
|
|
|
_value = case[mapped_key]
|
2018-01-31 18:59:10 +08:00
|
|
|
if _value in case_filter[key]:
|
|
|
|
filtered_cases.append(case)
|
|
|
|
except KeyError:
|
|
|
|
# case don't have this key, regard as filter success
|
|
|
|
filtered_cases.append(case)
|
|
|
|
test_cases = filtered_cases
|
2019-11-21 17:33:32 +08:00
|
|
|
# sort cases with configs and test functions
|
|
|
|
# in later stage cases with similar attributes are more likely to be assigned to the same job
|
|
|
|
# it will reduce the count of flash DUT operations
|
|
|
|
test_cases.sort(key=lambda x: x["config"] + x["multi_stage"] + x["multi_device"])
|
2018-01-31 18:59:10 +08:00
|
|
|
return test_cases
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("test_case",
|
|
|
|
help="test case folder or file")
|
|
|
|
parser.add_argument("ci_config_file",
|
|
|
|
help="gitlab ci config file")
|
|
|
|
parser.add_argument("output_path",
|
|
|
|
help="output path of config files")
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
assign_test = UnitTestAssignTest(args.test_case, args.ci_config_file)
|
|
|
|
assign_test.assign_cases()
|
|
|
|
assign_test.output_configs(args.output_path)
|