esp-idf/tools/ci/idf_ci_utils.py

276 lines
8.4 KiB
Python

# SPDX-FileCopyrightText: 2020-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
# internal use only for CI
# some CI related util functions
import logging
import os
import re
import subprocess
import sys
import typing as t
from functools import cached_property
from pathlib import Path
IDF_PATH: str = os.path.abspath(os.getenv('IDF_PATH', os.path.join(os.path.dirname(__file__), '..', '..')))
def get_submodule_dirs(full_path: bool = False) -> t.List[str]:
"""
To avoid issue could be introduced by multi-os or additional dependency,
we use python and git to get this output
:return: List of submodule dirs
"""
dirs = []
try:
lines = (
subprocess.check_output(
[
'git',
'config',
'--file',
os.path.realpath(os.path.join(IDF_PATH, '.gitmodules')),
'--get-regexp',
'path',
]
)
.decode('utf8')
.strip()
.split('\n')
)
for line in lines:
_, path = line.split(' ')
if full_path:
dirs.append(os.path.join(IDF_PATH, path))
else:
dirs.append(path)
except Exception as e: # pylint: disable=W0703
logging.warning(str(e))
return dirs
def _check_git_filemode(full_path: str) -> bool:
try:
stdout = subprocess.check_output(['git', 'ls-files', '--stage', full_path]).strip().decode('utf-8')
except subprocess.CalledProcessError:
return True
mode = stdout.split(' ', 1)[0] # e.g. 100644 for a rw-r--r--
if any([int(i, 8) & 1 for i in mode[-3:]]):
return True
return False
def is_executable(full_path: str) -> bool:
"""
os.X_OK will always return true on windows. Use git to check file mode.
:param full_path: file full path
:return: True is it's an executable file
"""
if sys.platform == 'win32':
return _check_git_filemode(full_path)
return os.access(full_path, os.X_OK)
def get_git_files(path: str = IDF_PATH, full_path: bool = False) -> t.List[str]:
"""
Get the result of git ls-files
:param path: path to run git ls-files
:param full_path: return full path if set to True
:return: list of file paths
"""
try:
# this is a workaround when using under worktree
# if you're using worktree, when running git commit a new environment variable GIT_DIR would be declared,
# the value should be <origin_repo_path>/.git/worktrees/<worktree name>
# This would affect the return value of `git ls-files`, unset this would use the `cwd`value or its parent
# folder if no `.git` folder found in `cwd`.
workaround_env = os.environ.copy()
workaround_env.pop('GIT_DIR', None)
files = (
subprocess.check_output(['git', 'ls-files'], cwd=path, env=workaround_env)
.decode('utf8')
.strip()
.split('\n')
)
except Exception as e: # pylint: disable=W0703
logging.warning(str(e))
files = []
return [os.path.join(path, f) for f in files] if full_path else files
def to_list(s: t.Any) -> t.List[t.Any]:
if not s:
return []
if isinstance(s, (set, tuple)):
return list(s)
if isinstance(s, list):
return s
return [s]
class GitlabYmlConfig:
def __init__(self, root_yml_filepath: str = os.path.join(IDF_PATH, '.gitlab-ci.yml')) -> None:
self._config: t.Dict[str, t.Any] = {}
self._defaults: t.Dict[str, t.Any] = {}
self._load(root_yml_filepath)
def _load(self, root_yml_filepath: str) -> None:
# avoid unused import in other pre-commit hooks
import yaml
all_config = dict()
root_yml = yaml.load(open(root_yml_filepath), Loader=yaml.FullLoader)
# expanding "include"
for item in root_yml.pop('include', []) or []:
all_config.update(yaml.load(open(os.path.join(IDF_PATH, item)), Loader=yaml.FullLoader))
if 'default' in all_config:
self._defaults = all_config.pop('default')
self._config = all_config
# anchor is the string that will be reused in templates
self._anchor_keys: t.Set[str] = set()
# template is a dict that will be extended
self._template_keys: t.Set[str] = set()
self._used_template_keys: t.Set[str] = set() # tracing the used templates
# job is a dict that will be executed
self._job_keys: t.Set[str] = set()
self.expand_extends()
@property
def default(self) -> t.Dict[str, t.Any]:
return self._defaults
@property
def config(self) -> t.Dict[str, t.Any]:
return self._config
@cached_property
def global_keys(self) -> t.List[str]:
return ['default', 'include', 'workflow', 'variables', 'stages']
@cached_property
def anchors(self) -> t.Dict[str, t.Any]:
return {k: v for k, v in self.config.items() if k in self._anchor_keys}
@cached_property
def jobs(self) -> t.Dict[str, t.Any]:
return {k: v for k, v in self.config.items() if k in self._job_keys}
@cached_property
def templates(self) -> t.Dict[str, t.Any]:
return {k: v for k, v in self.config.items() if k in self._template_keys}
@cached_property
def used_templates(self) -> t.Set[str]:
return self._used_template_keys
def expand_extends(self) -> None:
"""
expand the `extends` key in-place.
"""
for k, v in self.config.items():
if k in self.global_keys:
continue
if isinstance(v, (str, list)):
self._anchor_keys.add(k)
elif k.startswith('.if-'):
self._anchor_keys.add(k)
elif k.startswith('.'):
self._template_keys.add(k)
elif isinstance(v, dict):
self._job_keys.add(k)
else:
raise ValueError(f'Unknown type for key {k} with value {v}')
# no need to expand anchor
# expand template first
for k in self._template_keys:
self._expand_extends(k)
# expand job
for k in self._job_keys:
self._expand_extends(k)
def _merge_dict(self, d1: t.Dict[str, t.Any], d2: t.Dict[str, t.Any]) -> t.Any:
for k, v in d2.items():
if k in d1:
if isinstance(v, dict) and isinstance(d1[k], dict):
d1[k] = self._merge_dict(d1[k], v)
else:
d1[k] = v
else:
d1[k] = v
return d1
def _expand_extends(self, name: str) -> t.Dict[str, t.Any]:
extends = to_list(self.config[name].pop('extends', None))
if not extends:
return self.config[name] # type: ignore
original_d = self.config[name].copy()
d = {}
while extends:
self._used_template_keys.update(extends) # for tracking
for i in extends:
d.update(self._expand_extends(i))
extends = to_list(self.config[name].pop('extends', None))
self.config[name] = self._merge_dict(d, original_d)
return self.config[name] # type: ignore
def get_all_manifest_files() -> t.List[str]:
paths: t.List[str] = []
for p in Path(IDF_PATH).glob('**/.build-test-rules.yml'):
if 'managed_components' in p.parts:
continue
paths.append(str(p))
return paths
def sanitize_job_name(name: str) -> str:
"""
Sanitize the job name from CI_JOB_NAME
- for job with `parallel: int` set, the `CI_JOB_NAME` would be `job_name index/total`, like `foo 1/3`
- for job with `parallel: matrix` set, the `CI_JOB_NAME` would be `job_name: [var1, var2]`, like `foo: [a, b]`
We consider
- the jobs generated by `parallel: int` as the same job, i.e., we remove the index/total part.
- the jobs generated by `parallel: matrix` as different jobs, so we keep the matrix part.
:param name: job name
:return: sanitized job name
"""
return re.sub(r' \d+/\d+', '', name)
def idf_relpath(p: str) -> str:
"""
Turn all paths under IDF_PATH to relative paths
:param p: path
:return: relpath to IDF_PATH, or absolute path if not under IDF_PATH
"""
abs_path = os.path.abspath(p)
if abs_path.startswith(IDF_PATH):
return os.path.relpath(abs_path, IDF_PATH)
else:
return abs_path