ci: Add python types hints

This commit is contained in:
simon.chupin 2022-06-28 19:00:12 +02:00
parent 05b31339f6
commit 819d5a2b61
15 changed files with 181 additions and 181 deletions

View File

@ -17,14 +17,14 @@ if not IDF_PATH:
GITLAB_CONFIG_FILE = os.path.join(IDF_PATH, '.gitlab-ci.yml')
def check_artifacts_expire_time():
def check_artifacts_expire_time() -> None:
with open(GITLAB_CONFIG_FILE, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
# load files listed in `include`
if 'include' in config:
for _file in config['include']:
with open(os.path.join(IDF_PATH, _file)) as f:
with open(os.path.join(IDF_PATH or '', _file)) as f:
config.update(yaml.load(f, Loader=yaml.FullLoader))
print('expire time for jobs:')

View File

@ -8,15 +8,11 @@ import argparse
import os
import re
from functools import partial
from typing import BinaryIO, Callable, Dict, Generator, List, Optional, Tuple
import elftools
from elftools.elf import elffile
try:
from typing import BinaryIO, Callable, Dict, Generator, List, Optional, Tuple
except ImportError:
pass
FUNCTION_REGEX = re.compile(
r'^;; Function (?P<mangle>.*)\s+\((?P<function>\S+)(,.*)?\).*$'
)
@ -25,29 +21,29 @@ SYMBOL_REF_REGEX = re.compile(r'^.*\(symbol_ref[^()]*\("(?P<target>.*)"\).*$')
class RtlFunction(object):
def __init__(self, name, rtl_filename, tu_filename):
def __init__(self, name: str, rtl_filename: str, tu_filename: str) -> None:
self.name = name
self.rtl_filename = rtl_filename
self.tu_filename = tu_filename
self.calls = list() # type: List[str]
self.refs = list() # type: List[str]
self.calls: List[str] = list()
self.refs: List[str] = list()
self.sym = None
class SectionAddressRange(object):
def __init__(self, name, addr, size): # type: (str, int, int) -> None
def __init__(self, name: str, addr: int, size: int) -> None:
self.name = name
self.low = addr
self.high = addr + size
def __str__(self):
def __str__(self) -> str:
return '{}: 0x{:08x} - 0x{:08x}'.format(self.name, self.low, self.high)
def contains_address(self, addr):
def contains_address(self, addr: int) -> bool:
return self.low <= addr < self.high
TARGET_SECTIONS = {
TARGET_SECTIONS: Dict[str, List[SectionAddressRange]] = {
'esp32': [
SectionAddressRange('.rom.text', 0x40000000, 0x70000),
SectionAddressRange('.rom.rodata', 0x3ff96000, 0x9018)
@ -60,20 +56,20 @@ TARGET_SECTIONS = {
SectionAddressRange('.rom.text', 0x40000000, 0x568d0),
SectionAddressRange('.rom.rodata', 0x3ff071c0, 0x8e30)
]
} # type: Dict[str, List[SectionAddressRange]]
}
class Symbol(object):
def __init__(self, name, addr, local, filename, section): # type: (str, int, bool, Optional[str], Optional[str]) -> None
def __init__(self, name: str, addr: int, local: bool, filename: Optional[str], section: Optional[str]) -> None:
self.name = name
self.addr = addr
self.local = local
self.filename = filename
self.section = section
self.refers_to = list() # type: List[Symbol]
self.referred_from = list() # type: List[Symbol]
self.refers_to: List[Symbol] = list()
self.referred_from: List[Symbol] = list()
def __str__(self):
def __str__(self) -> str:
return '{} @0x{:08x} [{}]{} {}'.format(
self.name,
self.addr,
@ -84,11 +80,11 @@ class Symbol(object):
class Reference(object):
def __init__(self, from_sym, to_sym): # type: (Symbol, Symbol) -> None
def __init__(self, from_sym: Symbol, to_sym: Symbol) -> None:
self.from_sym = from_sym
self.to_sym = to_sym
def __str__(self):
def __str__(self) -> str:
return '{} @0x{:08x} ({}) -> {} @0x{:08x} ({})'.format(
self.from_sym.name,
self.from_sym.addr,
@ -100,13 +96,13 @@ class Reference(object):
class ElfInfo(object):
def __init__(self, elf_file): # type: (BinaryIO) -> None
def __init__(self, elf_file: BinaryIO) -> None:
self.elf_file = elf_file
self.elf_obj = elffile.ELFFile(self.elf_file)
self.section_ranges = self._load_sections()
self.symbols = self._load_symbols()
def _load_symbols(self): # type: () -> List[Symbol]
def _load_symbols(self) -> List[Symbol]:
symbols = []
for s in self.elf_obj.iter_sections():
if not isinstance(s, elftools.elf.sections.SymbolTableSection):
@ -130,7 +126,7 @@ class ElfInfo(object):
)
return symbols
def _load_sections(self): # type: () -> List[SectionAddressRange]
def _load_sections(self) -> List[SectionAddressRange]:
result = []
for segment in self.elf_obj.iter_segments():
if segment['p_type'] == 'PT_LOAD':
@ -149,22 +145,22 @@ class ElfInfo(object):
return result
def symbols_by_name(self, name): # type: (str) -> List[Symbol]
def symbols_by_name(self, name: str) -> List['Symbol']:
res = []
for sym in self.symbols:
if sym.name == name:
res.append(sym)
return res
def section_for_addr(self, sym_addr): # type: (int) -> Optional[str]
def section_for_addr(self, sym_addr: int) -> Optional[str]:
for sar in self.section_ranges:
if sar.contains_address(sym_addr):
return sar.name
return None
def load_rtl_file(rtl_filename, tu_filename, functions): # type: (str, str, List[RtlFunction]) -> None
last_function = None # type: Optional[RtlFunction]
def load_rtl_file(rtl_filename: str, tu_filename: str, functions: List[RtlFunction]) -> None:
last_function: Optional[RtlFunction] = None
for line in open(rtl_filename):
# Find function definition
match = re.match(FUNCTION_REGEX, line)
@ -192,7 +188,7 @@ def load_rtl_file(rtl_filename, tu_filename, functions): # type: (str, str, Lis
continue
def rtl_filename_matches_sym_filename(rtl_filename, symbol_filename): # type: (str, str) -> bool
def rtl_filename_matches_sym_filename(rtl_filename: str, symbol_filename: str) -> bool:
# Symbol file names (from ELF debug info) are short source file names, without path: "cpu_start.c".
# RTL file names are paths relative to the build directory, e.g.:
# "build/esp-idf/esp_system/CMakeFiles/__idf_esp_system.dir/port/cpu_start.c.234r.expand"
@ -211,7 +207,7 @@ class SymbolNotFound(RuntimeError):
pass
def find_symbol_by_name(name, elfinfo, local_func_matcher): # type: (str, ElfInfo, Callable[[Symbol], bool]) -> Optional[Symbol]
def find_symbol_by_name(name: str, elfinfo: ElfInfo, local_func_matcher: Callable[[Symbol], bool]) -> Optional[Symbol]:
"""
Find an ELF symbol for the given name.
local_func_matcher is a callback function which checks is the candidate local symbol is suitable.
@ -238,7 +234,7 @@ def find_symbol_by_name(name, elfinfo, local_func_matcher): # type: (str, ElfIn
return local_candidate or global_candidate
def match_local_source_func(rtl_filename, sym): # type: (str, Symbol) -> bool
def match_local_source_func(rtl_filename: str, sym: Symbol) -> bool:
"""
Helper for match_rtl_funcs_to_symbols, checks if local symbol sym is a good candidate for the
reference source (caller), based on the RTL file name.
@ -247,7 +243,7 @@ def match_local_source_func(rtl_filename, sym): # type: (str, Symbol) -> bool
return rtl_filename_matches_sym_filename(rtl_filename, sym.filename)
def match_local_target_func(rtl_filename, sym_from, sym): # type: (str, Symbol, Symbol) -> bool
def match_local_target_func(rtl_filename: str, sym_from: Symbol, sym: Symbol) -> bool:
"""
Helper for match_rtl_funcs_to_symbols, checks if local symbol sym is a good candidate for the
reference target (callee or referenced data), based on RTL filename of the source symbol
@ -263,9 +259,9 @@ def match_local_target_func(rtl_filename, sym_from, sym): # type: (str, Symbol,
return rtl_filename_matches_sym_filename(rtl_filename, sym.filename)
def match_rtl_funcs_to_symbols(rtl_functions, elfinfo): # type: (List[RtlFunction], ElfInfo) -> Tuple[List[Symbol], List[Reference]]
symbols = [] # type: List[Symbol]
refs = [] # type: List[Reference]
def match_rtl_funcs_to_symbols(rtl_functions: List[RtlFunction], elfinfo: ElfInfo) -> Tuple[List[Symbol], List[Reference]]:
symbols: List[Symbol] = []
refs: List[Reference] = []
# General idea:
# - iterate over RTL functions.
@ -308,17 +304,17 @@ def match_rtl_funcs_to_symbols(rtl_functions, elfinfo): # type: (List[RtlFuncti
return symbols, refs
def get_symbols_and_refs(rtl_list, elf_file): # type: (List[str], BinaryIO) -> Tuple[List[Symbol], List[Reference]]
def get_symbols_and_refs(rtl_list: List[str], elf_file: BinaryIO) -> Tuple[List[Symbol], List[Reference]]:
elfinfo = ElfInfo(elf_file)
rtl_functions = [] # type: List[RtlFunction]
rtl_functions: List[RtlFunction] = []
for file_name in rtl_list:
load_rtl_file(file_name, file_name, rtl_functions)
return match_rtl_funcs_to_symbols(rtl_functions, elfinfo)
def list_refs_from_to_sections(refs, from_sections, to_sections): # type: (List[Reference], List[str], List[str]) -> int
def list_refs_from_to_sections(refs: List[Reference], from_sections: List[str], to_sections: List[str]) -> int:
found = 0
for ref in refs:
if (not from_sections or ref.from_sym.section in from_sections) and \
@ -328,7 +324,7 @@ def list_refs_from_to_sections(refs, from_sections, to_sections): # type: (List
return found
def find_files_recursive(root_path, ext): # type: (str, str) -> Generator[str, None, None]
def find_files_recursive(root_path: str, ext: str) -> Generator[str, None, None]:
for root, _, files in os.walk(root_path):
for basename in files:
if basename.endswith(ext):
@ -336,7 +332,7 @@ def find_files_recursive(root_path, ext): # type: (str, str) -> Generator[str,
yield filename
def main():
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
@ -379,7 +375,7 @@ def main():
args = parser.parse_args()
if args.rtl_list:
with open(args.rtl_list, 'r') as rtl_list_file:
rtl_list = [line.strip for line in rtl_list_file]
rtl_list = [line.strip() for line in rtl_list_file]
else:
if not args.rtl_dir:
raise RuntimeError('Either --rtl-list or --rtl-dir must be specified')

View File

@ -10,6 +10,7 @@ import os
import re
import subprocess
import sys
from typing import List, Optional
from idf_ci_utils import IDF_PATH
@ -17,7 +18,7 @@ CODEOWNERS_PATH = os.path.join(IDF_PATH, '.gitlab', 'CODEOWNERS')
CODEOWNER_GROUP_PREFIX = '@esp-idf-codeowners/'
def get_all_files():
def get_all_files() -> List[str]:
"""
Get list of all file paths in the repository.
"""
@ -25,7 +26,7 @@ def get_all_files():
return subprocess.check_output(['git', 'ls-files'], cwd=IDF_PATH).decode('utf-8').strip().split('\n')
def pattern_to_regex(pattern):
def pattern_to_regex(pattern: str) -> str:
"""
Convert the CODEOWNERS path pattern into a regular expression string.
"""
@ -59,14 +60,14 @@ def pattern_to_regex(pattern):
return re_pattern
def files_by_regex(all_files, regex):
def files_by_regex(all_files: List, regex: re.Pattern) -> List:
"""
Return all files in the repository matching the given regular expresion.
"""
return [file for file in all_files if regex.search('/' + file)]
def files_by_pattern(all_files, pattern=None):
def files_by_pattern(all_files: list, pattern: Optional[str]=None) -> List:
"""
Return all the files in the repository matching the given CODEOWNERS pattern.
"""
@ -76,7 +77,7 @@ def files_by_pattern(all_files, pattern=None):
return files_by_regex(all_files, re.compile(pattern_to_regex(pattern)))
def action_identify(args):
def action_identify(args: argparse.Namespace) -> None:
best_match = []
all_files = get_all_files()
with open(CODEOWNERS_PATH) as f:
@ -94,7 +95,7 @@ def action_identify(args):
print(owner)
def action_test_pattern(args):
def action_test_pattern(args: argparse.Namespace) -> None:
re_pattern = pattern_to_regex(args.pattern)
if args.regex:
@ -106,10 +107,10 @@ def action_test_pattern(args):
print(f)
def action_ci_check(args):
def action_ci_check(args: argparse.Namespace) -> None:
errors = []
def add_error(msg):
def add_error(msg: str) -> None:
errors.append('{}:{}: {}'.format(CODEOWNERS_PATH, line_no, msg))
all_files = get_all_files()
@ -158,7 +159,7 @@ def action_ci_check(args):
raise SystemExit(1)
def in_order(prev, current):
def in_order(prev: str, current: str) -> bool:
"""
Return True if the ordering is correct for these two lines ('prev' should be before 'current').
@ -172,10 +173,10 @@ def in_order(prev, current):
if not prev:
return True # first element in file
def is_separator(c):
def is_separator(c: str) -> bool:
return c in '-_/' # ignore differences between separators for ordering purposes
def is_wildcard(c):
def is_wildcard(c: str) -> bool:
return c in '?*'
# looping until we see a different character
@ -192,7 +193,7 @@ def in_order(prev, current):
return len(current) >= len(prev)
def main():
def main() -> None:
parser = argparse.ArgumentParser(
sys.argv[0], description='Internal helper script for working with the CODEOWNERS file.'
)

View File

@ -2028,7 +2028,6 @@ tools/ble/lib_ble_client.py
tools/ble/lib_gap.py
tools/ble/lib_gatt.py
tools/catch/catch.hpp
tools/esp_app_trace/test/sysview/blink.c
tools/find_build_apps/__init__.py
tools/find_build_apps/cmake.py
tools/find_build_apps/common.py

View File

@ -9,6 +9,7 @@ import argparse
import os
import sys
from io import open
from typing import Set, Tuple
from check_kconfigs import valid_directory
from idf_ci_utils import get_submodule_dirs
@ -19,11 +20,11 @@ FILES_TO_CHECK = ('sdkconfig.ci', 'sdkconfig.defaults')
# ignored directories (makes sense only when run on IDF_PATH)
# Note: IGNORE_DIRS is a tuple in order to be able to use it directly with the startswith() built-in function which
# accepts tuples but no lists.
IGNORE_DIRS = (
IGNORE_DIRS: Tuple = (
)
def _parse_path(path, sep=None):
def _parse_path(path: os.PathLike[str], sep: str=None) -> Set:
ret = set()
with open(path, 'r', encoding='utf-8') as f:
for line in f:
@ -33,13 +34,13 @@ def _parse_path(path, sep=None):
return ret
def _valid_directory(path):
def _valid_directory(path: os.PathLike[str]) -> os.PathLike[str]:
if not os.path.isdir(path):
raise argparse.ArgumentTypeError('{} is not a valid directory!'.format(path))
return path
def main():
def check() -> int:
parser = argparse.ArgumentParser(description='Kconfig options checker')
parser.add_argument('files', nargs='*',
help='Kconfig files')
@ -102,5 +103,9 @@ def main():
return 0
def main() -> None:
sys.exit(check())
if __name__ == '__main__':
sys.exit(main())
main()

View File

@ -6,6 +6,7 @@
import argparse
import os
import sys
from typing import Iterable, List
try:
from idf_ci_utils import is_executable
@ -15,7 +16,7 @@ except ImportError:
from idf_ci_utils import is_executable
def _strip_each_item(iterable):
def _strip_each_item(iterable: Iterable) -> List:
res = []
for item in iterable:
if item:
@ -28,7 +29,7 @@ EXECUTABLE_LIST_FN = os.path.join(IDF_PATH, 'tools/ci/executable-list.txt')
known_executables = _strip_each_item(open(EXECUTABLE_LIST_FN).readlines())
def check_executable_list():
def check_executable_list() -> int:
ret = 0
for index, fn in enumerate(known_executables):
if not os.path.exists(os.path.join(IDF_PATH, fn)):
@ -37,7 +38,7 @@ def check_executable_list():
return ret
def check_executables(files):
def check_executables(files: List) -> int:
ret = 0
for fn in files:
fn_executable = is_executable(fn)
@ -51,7 +52,7 @@ def check_executables(files):
return ret
def main():
def check() -> int:
parser = argparse.ArgumentParser()
parser.add_argument('--action', choices=['executables', 'list'], required=True,
help='if "executables", pass all your executables to see if it\'s in the list.'
@ -70,4 +71,4 @@ def main():
if __name__ == '__main__':
sys.exit(main())
sys.exit(check())

View File

@ -18,6 +18,7 @@ import subprocess
import tempfile
from io import open
from threading import Event, Thread
from typing import List, Optional, Set, Tuple, Union
class HeaderFailed(Exception):
@ -26,28 +27,28 @@ class HeaderFailed(Exception):
class HeaderFailedSdkconfig(HeaderFailed):
def __str__(self):
def __str__(self) -> str:
return 'Sdkconfig Error'
class HeaderFailedBuildError(HeaderFailed):
def __str__(self):
def __str__(self) -> str:
return 'Header Build Error'
class HeaderFailedCppGuardMissing(HeaderFailed):
def __str__(self):
def __str__(self) -> str:
return 'Header Missing C++ Guard'
class HeaderFailedContainsCode(HeaderFailed):
def __str__(self):
def __str__(self) -> str:
return 'Header Produced non-zero object'
# Creates a temp file and returns both output as a string and a file name
#
def exec_cmd_to_temp_file(what, suffix=''):
def exec_cmd_to_temp_file(what: List, suffix: str='') -> Tuple[int, str, str, str, str]:
out_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
rc, out, err, cmd = exec_cmd(what, out_file)
with open(out_file.name, 'r', encoding='utf-8') as f:
@ -55,12 +56,12 @@ def exec_cmd_to_temp_file(what, suffix=''):
return rc, out, err, out_file.name, cmd
def exec_cmd(what, out_file=subprocess.PIPE):
def exec_cmd(what: List, out_file: Union[tempfile._TemporaryFileWrapper[bytes], int]=subprocess.PIPE) -> Tuple[int, str, str, str]:
p = subprocess.Popen(what, stdin=subprocess.PIPE, stdout=out_file, stderr=subprocess.PIPE)
output, err = p.communicate()
output_b, err_b = p.communicate()
rc = p.returncode
output = output.decode('utf-8') if output is not None else None
err = err.decode('utf-8') if err is not None else None
output: str = output_b.decode('utf-8') if output is not None else ''
err: str = err_b.decode('utf-8') if err is not None else ''
return rc, output, err, ' '.join(what)
@ -74,11 +75,11 @@ class PublicHeaderChecker:
PREPROC_OUT_DIFFERENT_WITH_EXT_C_HDR_OK = 6 # -> Both preprocessors produce different, non-zero output with extern "C" (header seems OK)
PREPROC_OUT_DIFFERENT_NO_EXT_C_HDR_FAILED = 7 # -> Both preprocessors produce different, non-zero output without extern "C" (header fails)
def log(self, message, debug=False):
def log(self, message: str, debug: bool=False) -> None:
if self.verbose or debug:
print(message)
def __init__(self, verbose=False, jobs=1, prefix=None):
def __init__(self, verbose: bool=False, jobs: int=1, prefix: Optional[str]=None) -> None:
self.gcc = '{}gcc'.format(prefix)
self.gpp = '{}g++'.format(prefix)
self.verbose = verbose
@ -89,26 +90,26 @@ class PublicHeaderChecker:
self.error_orphan_kconfig = re.compile(r'#error CONFIG_VARS_USED_WHILE_SDKCONFIG_NOT_INCLUDED')
self.kconfig_macro = re.compile(r'\bCONFIG_[A-Z0-9_]+')
self.assembly_nocode = r'^\s*(\.file|\.text|\.ident|\.option|\.attribute).*$'
self.check_threads = []
self.check_threads: List[Thread] = []
self.job_queue = queue.Queue()
self.failed_queue = queue.Queue()
self.job_queue: queue.Queue = queue.Queue()
self.failed_queue: queue.Queue = queue.Queue()
self.terminate = Event()
def __enter__(self):
def __enter__(self) -> 'PublicHeaderChecker':
for i in range(self.jobs):
t = Thread(target=self.check_headers, args=(i, ))
self.check_threads.append(t)
t.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
def __exit__(self, exc_type: str, exc_value: str, traceback: str) -> None:
self.terminate.set()
for t in self.check_threads:
t.join()
# thread function process incoming header file from a queue
def check_headers(self, num):
def check_headers(self, num: int) -> None:
while not self.terminate.is_set():
if not self.job_queue.empty():
task = self.job_queue.get()
@ -125,10 +126,10 @@ class PublicHeaderChecker:
self.terminate.set()
raise
def get_failed(self):
def get_failed(self) -> List:
return list(self.failed_queue.queue)
def join(self):
def join(self) -> None:
for t in self.check_threads:
while t.is_alive() and not self.terminate.is_set():
t.join(1) # joins with timeout to respond to keyboard interrupt
@ -147,7 +148,7 @@ class PublicHeaderChecker:
# - Fail the test if the preprocessor outputs are the same (but with some code)
# - If outputs different, pass the test
# 4) If header passed the steps 1) and 3) test that it produced zero assembly code
def check_one_header(self, header, num):
def check_one_header(self, header: str, num: int) -> None:
res = self.preprocess_one_header(header, num)
if res == self.COMPILE_ERR_REF_CONFIG_HDR_FAILED:
raise HeaderFailedSdkconfig()
@ -173,7 +174,7 @@ class PublicHeaderChecker:
if temp_header:
os.unlink(temp_header)
def compile_one_header(self, header):
def compile_one_header(self, header: str) -> None:
rc, out, err, cmd = exec_cmd([self.gcc, '-S', '-o-', '-include', header, self.main_c] + self.include_dir_flags)
if rc == 0:
if not re.sub(self.assembly_nocode, '', out, flags=re.M).isspace():
@ -184,7 +185,7 @@ class PublicHeaderChecker:
self.log('\nCompilation command failed:\n{}\n'.format(cmd), True)
raise HeaderFailedBuildError()
def preprocess_one_header(self, header, num, ignore_sdkconfig_issue=False):
def preprocess_one_header(self, header: str, num: int, ignore_sdkconfig_issue: bool=False) -> int:
all_compilation_flags = ['-w', '-P', '-E', '-DESP_PLATFORM', '-include', header, self.main_c] + self.include_dir_flags
if not ignore_sdkconfig_issue:
# just strip commnets to check for CONFIG_... macros
@ -232,8 +233,10 @@ class PublicHeaderChecker:
pass
# Get compilation data from an example to list all public header files
def list_public_headers(self, ignore_dirs, ignore_files, only_dir=None):
def list_public_headers(self, ignore_dirs: List, ignore_files: Union[List, Set], only_dir: str=None) -> None:
idf_path = os.getenv('IDF_PATH')
if idf_path is None:
raise RuntimeError("Environment variable 'IDF_PATH' wasn't set.")
project_dir = os.path.join(idf_path, 'examples', 'get-started', 'blink')
build_dir = tempfile.mkdtemp()
sdkconfig = os.path.join(build_dir, 'sdkconfig')
@ -283,22 +286,22 @@ class PublicHeaderChecker:
self.include_dir_flags = include_dir_flags
ignore_files = set(ignore_files)
# processes public include files, removing ignored files
for f in all_include_files:
rel_path_file = os.path.relpath(f, idf_path)
for file_name in all_include_files:
rel_path_file = os.path.relpath(file_name, idf_path)
if any([os.path.commonprefix([d, rel_path_file]) == d for d in ignore_dirs]):
self.log('{} - file ignored (inside ignore dir)'.format(f))
self.log('{} - file ignored (inside ignore dir)'.format(file_name))
continue
if rel_path_file in ignore_files:
self.log('{} - file ignored'.format(f))
self.log('{} - file ignored'.format(file_name))
continue
files_to_check.append(f)
files_to_check.append(file_name)
# removes duplicates and places headers to a work queue
for f in set(files_to_check):
self.job_queue.put(f)
for file_name in set(files_to_check):
self.job_queue.put(file_name)
self.job_queue.put(None) # to indicate the last job
def check_all_headers():
def check_all_headers() -> None:
parser = argparse.ArgumentParser('Public header checker file', formatter_class=argparse.RawDescriptionHelpFormatter, epilog='''\
Tips for fixing failures reported by this script
------------------------------------------------

View File

@ -16,6 +16,7 @@ import urllib.error
import urllib.request
from collections import defaultdict, namedtuple
from pathlib import Path
from typing import List
EXCLUDE_DOCS_LIST = ['examples/peripherals/secure_element/atecc608_ecdsa/components/esp-cryptoauthlib/cryptoauthlib/**']
@ -26,28 +27,28 @@ Link = namedtuple('Link', ['file', 'url'])
class ReadmeLinkError(Exception):
def __init__(self, file, url):
def __init__(self, file: str, url: str) -> None:
self.file = file
self.url = url
class RelativeLinkError(ReadmeLinkError):
def __str__(self):
def __str__(self) -> str:
return 'Relative link error, file - {} not found, linked from {}'.format(self.url, self.file)
class UrlLinkError(ReadmeLinkError):
def __init__(self, file, url, error_code):
def __init__(self, file: str, url: str, error_code: str):
self.error_code = error_code
super().__init__(file, url)
def __str__(self):
def __str__(self) -> str:
files = [str(f) for f in self.file]
return 'URL error, url - {} in files - {} is not accessible, request returned {}'.format(self.url, ', '.join(files), self.error_code)
# we do not want a failed test just due to bad network conditions, for non 404 errors we simply print a warning
def check_url(url, files, timeout):
def check_url(url: str, files: str, timeout: float) -> None:
try:
with urllib.request.urlopen(url, timeout=timeout):
return
@ -60,7 +61,7 @@ def check_url(url, files, timeout):
print('Unable to access {}, err = {}'.format(url, str(e)))
def check_web_links(web_links):
def check_web_links(web_links: defaultdict) -> List:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
errors = []
@ -74,7 +75,7 @@ def check_web_links(web_links):
return errors
def check_file_links(file_links):
def check_file_links(file_links: List) -> List:
errors = []
for link in file_links:
@ -87,10 +88,13 @@ def check_file_links(file_links):
return errors
def get_md_links(folder):
def get_md_links(folder: str) -> List:
MD_LINK_RE = r'\[.+?\]\((.+?)(#.+)?\)'
idf_path = Path(os.getenv('IDF_PATH'))
idf_path_str = os.getenv('IDF_PATH')
if idf_path_str is None:
raise RuntimeError("Environment variable 'IDF_PATH' wasn't set.")
idf_path = Path(idf_path_str)
links = []
for path in (idf_path / folder).rglob('*.md'):
@ -110,7 +114,7 @@ def get_md_links(folder):
return links
def check_readme_links(args):
def check_readme_links(args: argparse.Namespace) -> int:
links = get_md_links('examples')
print('Found {} links'.format(len(links)))

View File

@ -12,7 +12,7 @@ import os
import re
import sys
from copy import deepcopy
from typing import List
from typing import Any, Dict, List, Optional, Set, Union
import yaml
from idf_ci_utils import IDF_PATH
@ -20,20 +20,20 @@ from idf_ci_utils import IDF_PATH
ROOT_YML_FP = os.path.join(IDF_PATH, '.gitlab-ci.yml')
def load_yaml(file_path):
def load_yaml(file_path: str) -> Any:
return yaml.load(open(file_path), Loader=yaml.FullLoader)
class YMLConfig:
def __init__(self, root_yml_file_path):
self._config = None
self._all_extends = None
def __init__(self, root_yml_file_path: str) -> None:
self._config: Optional[Dict] = None
self._all_extends: Optional[Set] = None
self.root_yml = load_yaml(root_yml_file_path)
assert self.root_yml
@staticmethod
def _list(str_or_list):
def _list(str_or_list: Union[str, List]) -> List:
if isinstance(str_or_list, str):
return [str_or_list]
if isinstance(str_or_list, list):
@ -43,7 +43,7 @@ class YMLConfig:
)
@property
def config(self):
def config(self) -> Dict:
if self._config:
return self._config
@ -54,7 +54,7 @@ class YMLConfig:
return self._config
@property
def all_extends(self):
def all_extends(self) -> Set:
if self._all_extends:
return self._all_extends
@ -67,7 +67,7 @@ class YMLConfig:
self._all_extends = res
return self._all_extends
def exists(self, key):
def exists(self, key: str) -> bool:
if key in self.all_extends:
return True
return False
@ -76,7 +76,7 @@ class YMLConfig:
YML_CONFIG = YMLConfig(ROOT_YML_FP)
def validate_needed_rules(rules_yml):
def validate_needed_rules(rules_yml: os.PathLike[str]) -> int:
res = 0
needed_rules = deepcopy(YML_CONFIG.all_extends)
with open(rules_yml) as fr:
@ -114,7 +114,7 @@ def parse_submodule_paths(
return res
def validate_submodule_patterns():
def validate_submodule_patterns() -> int:
submodule_paths = sorted(['.gitmodules'] + parse_submodule_paths())
submodule_paths_in_patterns = sorted(
YML_CONFIG.config.get('.patterns-submodule', [])

View File

@ -7,19 +7,20 @@ import argparse
import os
import sys
from pathlib import Path
from typing import Set, Tuple
import yaml
from idf_ci_utils import IDF_PATH, get_git_files
def check(pattern_yml, exclude_list):
def check(pattern_yml: str, exclude_list: str) -> Tuple[Set, Set]:
rules_dict = yaml.load(open(pattern_yml), Loader=yaml.FullLoader)
rules_patterns_set = set()
for k, v in rules_dict.items():
if k.startswith('.pattern') and k != '.patterns-python-files' and isinstance(v, list):
rules_patterns_set.update(v)
rules_files_set = set()
rules_files_set: Set = set()
idf_path = Path(IDF_PATH)
for pat in rules_patterns_set:
rules_files_set.update(idf_path.glob(pat))
@ -30,7 +31,7 @@ def check(pattern_yml, exclude_list):
if pat:
exclude_patterns_set.add(pat)
exclude_files_set = set()
exclude_files_set: Set = set()
for pat in exclude_patterns_set:
exclude_files_set.update(idf_path.glob(pat))

View File

@ -10,27 +10,28 @@ import json
import os
import re
import subprocess
from typing import List
IDF_GIT_DESCRIBE_PATTERN = re.compile(r'^v(\d)\.(\d)')
RETRY_COUNT = 3
def get_customized_project_revision(proj_name):
def get_customized_project_revision(proj_name: str) -> str:
"""
get customized project revision defined in bot message
"""
revision = ''
customized_project_revisions = os.getenv('BOT_CUSTOMIZED_REVISION')
if customized_project_revisions:
customized_project_revisions = json.loads(customized_project_revisions)
try:
revision = customized_project_revisions[proj_name.lower()]
except (KeyError, TypeError):
pass
customized_project_revisions_file = os.getenv('BOT_CUSTOMIZED_REVISION')
if customized_project_revisions_file:
customized_project_revisions = json.loads(customized_project_revisions_file)
try:
revision = customized_project_revisions[proj_name.lower()]
except (KeyError, TypeError):
pass
return revision
def target_branch_candidates(proj_name):
def target_branch_candidates(proj_name: str) -> List:
"""
:return: a list of target branch candidates, from highest priority to lowest priority.
"""

View File

@ -11,6 +11,7 @@ import re
import shutil
import subprocess
import time
from typing import Any, List
import gitlab_api
@ -28,27 +29,26 @@ class SubModule(object):
GIT_LS_TREE_OUTPUT_PATTERN = re.compile(r'\d+\s+commit\s+([0-9a-f]+)\s+')
def __init__(self, gitlab_inst, path, url):
def __init__(self, gitlab_inst: gitlab_api.Gitlab, path: str, url: str) -> None:
self.path = path
self.url = url
self.gitlab_inst = gitlab_inst
self.project_id = self._get_project_id(url)
self.commit_id = self._get_commit_id(path)
def _get_commit_id(self, path):
output = subprocess.check_output(['git', 'ls-tree', 'HEAD', path])
output = output.decode()
def _get_commit_id(self, path: str) -> str:
output = subprocess.check_output(['git', 'ls-tree', 'HEAD', path]).decode()
# example output: 160000 commit d88a262fbdf35e5abb372280eb08008749c3faa0 components/esp_wifi/lib
match = self.GIT_LS_TREE_OUTPUT_PATTERN.search(output)
return match.group(1)
return match.group(1) if match is not None else ''
def _get_project_id(self, url):
def _get_project_id(self, url: str) -> Any:
base_name = os.path.basename(url)
project_id = self.gitlab_inst.get_project_id(os.path.splitext(base_name)[0], # remove .git
namespace='espressif')
return project_id
def download_archive(self):
def download_archive(self) -> None:
print('Update submodule: {}: {}'.format(self.path, self.commit_id))
path_name = self.gitlab_inst.download_archive(self.commit_id, SUBMODULE_ARCHIVE_TEMP_FOLDER,
self.project_id, SUBMODULE_ARCHIVE_CACHE_DIR)
@ -58,33 +58,34 @@ class SubModule(object):
shutil.move(renamed_path, os.path.dirname(self.path))
def update_submodule(git_module_file, submodules_to_update):
def update_submodule(git_module_file: str, submodules_to_update: List) -> None:
gitlab_inst = gitlab_api.Gitlab()
submodules = []
with open(git_module_file, 'r') as f:
data = f.read()
match = SUBMODULE_PATTERN.search(data)
while True:
next_match = SUBMODULE_PATTERN.search(data, pos=match.end())
if next_match:
end_pos = next_match.start()
else:
end_pos = len(data)
path_match = PATH_PATTERN.search(data, pos=match.end(), endpos=end_pos)
url_match = URL_PATTERN.search(data, pos=match.end(), endpos=end_pos)
path = path_match.group(1)
url = url_match.group(1)
if match is not None:
while True:
next_match = SUBMODULE_PATTERN.search(data, pos=match.end())
if next_match:
end_pos = next_match.start()
else:
end_pos = len(data)
path_match = PATH_PATTERN.search(data, pos=match.end(), endpos=end_pos)
url_match = URL_PATTERN.search(data, pos=match.end(), endpos=end_pos)
path = path_match.group(1) if path_match is not None else ''
url = url_match.group(1) if url_match is not None else ''
filter_result = True
if submodules_to_update:
if path not in submodules_to_update:
filter_result = False
if filter_result:
submodules.append(SubModule(gitlab_inst, path, url))
filter_result = True
if submodules_to_update:
if path not in submodules_to_update:
filter_result = False
if filter_result:
submodules.append(SubModule(gitlab_inst, path, url))
match = next_match
if not match:
break
match = next_match
if not match:
break
shutil.rmtree(SUBMODULE_ARCHIVE_TEMP_FOLDER, ignore_errors=True)

View File

@ -14,11 +14,12 @@ import stat
import subprocess
import sys
import tarfile
from typing import Any, List, Tuple
import packaging.version
def env(variable, default=None):
def env(variable: str, default: str=None) -> str:
""" Shortcut to return the expanded version of an environment variable """
return os.path.expandvars(os.environ.get(variable, default) if default else os.environ[variable])
@ -28,7 +29,7 @@ sys.path.append(os.path.join(env('IDF_PATH'), 'docs'))
from sanitize_version import sanitize_version # noqa
def main():
def main() -> None:
# if you get KeyErrors on the following lines, it's probably because you're not running in Gitlab CI
git_ver = env('GIT_VER') # output of git describe --always
ci_ver = env('CI_COMMIT_REF_NAME', git_ver) # branch or tag we're building for (used for 'release' & URL)
@ -87,8 +88,8 @@ def main():
deploy('stable', tarball_path, docs_path, docs_server)
def deploy(version, tarball_path, docs_path, docs_server):
def run_ssh(commands):
def deploy(version: str, tarball_path: str, docs_path: str, docs_server: str) -> None:
def run_ssh(commands: List) -> None:
""" Log into docs_server and run a sequence of commands using ssh """
print('Running ssh: {}'.format(commands))
subprocess.run(['ssh', '-o', 'BatchMode=yes', docs_server, '-x', ' && '.join(commands)], check=True)
@ -110,7 +111,7 @@ def deploy(version, tarball_path, docs_path, docs_server):
# another thing made much more complex by the directory structure putting language before version...
def build_doc_tarball(version, git_ver, build_dir):
def build_doc_tarball(version: str, git_ver: str, build_dir: str) -> Tuple[str, List]:
""" Make a tar.gz archive of the docs, in the directory structure used to deploy as
the given version """
version_paths = []
@ -126,7 +127,8 @@ def build_doc_tarball(version, git_ver, build_dir):
# add symlink for stable and latest and adds them to PDF blob
symlinks = create_and_add_symlinks(version, git_ver, pdfs)
def not_sources_dir(ti):
def not_sources_dir(ti: Any) -> Any:
print(type(ti))
""" Filter the _sources directories out of the tarballs """
if ti.name.endswith('/_sources'):
return None
@ -171,7 +173,7 @@ def build_doc_tarball(version, git_ver, build_dir):
return (os.path.abspath(tarball_path), version_paths)
def create_and_add_symlinks(version, git_ver, pdfs):
def create_and_add_symlinks(version: str, git_ver: str, pdfs: List) -> List:
""" Create symbolic links for PDFs for 'latest' and 'stable' releases """
symlinks = []
@ -187,7 +189,7 @@ def create_and_add_symlinks(version, git_ver, pdfs):
return symlinks
def is_stable_version(version):
def is_stable_version(version: str) -> bool:
""" Heuristic for whether this is the latest stable release """
if not version.startswith('v'):
return False # branch name
@ -197,11 +199,11 @@ def is_stable_version(version):
git_out = subprocess.check_output(['git', 'tag', '-l']).decode('utf-8')
versions = [v.strip() for v in git_out.split('\n')]
versions = [v for v in versions if re.match(r'^v[\d\.]+$', v)] # include vX.Y.Z only
versions = [v for v in versions if re.match(r'^v[\d\.]+$', v.strip())] # include vX.Y.Z only
versions = [packaging.version.parse(v) for v in versions]
versions_pack = [packaging.version.parse(v) for v in versions]
max_version = max(versions)
max_version = max(versions_pack)
if max_version.public != version[1:]:
print('Stable version is v{}. This version is {}.'.format(max_version.public, version))

View File

@ -9,7 +9,7 @@ import os
import sys
def main():
def main() -> None:
# Sanitize environment variables
vars_to_remove = []
for var_name in os.environ.keys():

View File

@ -141,21 +141,7 @@ examples/wifi/iperf/iperf_test.py
tools/ble/lib_ble_client.py
tools/ble/lib_gap.py
tools/ble/lib_gatt.py
tools/ci/check_artifacts_expire_time.py
tools/ci/check_callgraph.py
tools/ci/check_codeowners.py
tools/ci/check_deprecated_kconfigs.py
tools/ci/check_executables.py
tools/ci/check_kconfigs.py
tools/ci/check_public_headers.py
tools/ci/check_readme_links.py
tools/ci/check_rules_yml.py
tools/ci/check_tools_files_patterns.py
tools/ci/checkout_project_ref.py
tools/ci/ci_fetch_submodule.py
tools/ci/deploy_docs.py
tools/ci/envsubst.py
tools/ci/normalize_clangtidy_path.py
tools/ci/python_packages/idf_http_server_test/adder.py
tools/ci/python_packages/idf_http_server_test/client.py
tools/ci/python_packages/idf_http_server_test/test.py