diff --git a/tools/ci/check_artifacts_expire_time.py b/tools/ci/check_artifacts_expire_time.py index be9193e0bc..ff298cac38 100644 --- a/tools/ci/check_artifacts_expire_time.py +++ b/tools/ci/check_artifacts_expire_time.py @@ -17,14 +17,14 @@ if not IDF_PATH: GITLAB_CONFIG_FILE = os.path.join(IDF_PATH, '.gitlab-ci.yml') -def check_artifacts_expire_time(): +def check_artifacts_expire_time() -> None: with open(GITLAB_CONFIG_FILE, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) # load files listed in `include` if 'include' in config: for _file in config['include']: - with open(os.path.join(IDF_PATH, _file)) as f: + with open(os.path.join(IDF_PATH or '', _file)) as f: config.update(yaml.load(f, Loader=yaml.FullLoader)) print('expire time for jobs:') diff --git a/tools/ci/check_callgraph.py b/tools/ci/check_callgraph.py index 1eaa6ee685..08b89cc2c2 100755 --- a/tools/ci/check_callgraph.py +++ b/tools/ci/check_callgraph.py @@ -8,15 +8,11 @@ import argparse import os import re from functools import partial +from typing import BinaryIO, Callable, Dict, Generator, List, Optional, Tuple import elftools from elftools.elf import elffile -try: - from typing import BinaryIO, Callable, Dict, Generator, List, Optional, Tuple -except ImportError: - pass - FUNCTION_REGEX = re.compile( r'^;; Function (?P.*)\s+\((?P\S+)(,.*)?\).*$' ) @@ -25,29 +21,29 @@ SYMBOL_REF_REGEX = re.compile(r'^.*\(symbol_ref[^()]*\("(?P.*)"\).*$') class RtlFunction(object): - def __init__(self, name, rtl_filename, tu_filename): + def __init__(self, name: str, rtl_filename: str, tu_filename: str) -> None: self.name = name self.rtl_filename = rtl_filename self.tu_filename = tu_filename - self.calls = list() # type: List[str] - self.refs = list() # type: List[str] + self.calls: List[str] = list() + self.refs: List[str] = list() self.sym = None class SectionAddressRange(object): - def __init__(self, name, addr, size): # type: (str, int, int) -> None + def __init__(self, name: str, addr: int, size: int) -> None: self.name = name self.low = addr self.high = addr + size - def __str__(self): + def __str__(self) -> str: return '{}: 0x{:08x} - 0x{:08x}'.format(self.name, self.low, self.high) - def contains_address(self, addr): + def contains_address(self, addr: int) -> bool: return self.low <= addr < self.high -TARGET_SECTIONS = { +TARGET_SECTIONS: Dict[str, List[SectionAddressRange]] = { 'esp32': [ SectionAddressRange('.rom.text', 0x40000000, 0x70000), SectionAddressRange('.rom.rodata', 0x3ff96000, 0x9018) @@ -60,20 +56,20 @@ TARGET_SECTIONS = { SectionAddressRange('.rom.text', 0x40000000, 0x568d0), SectionAddressRange('.rom.rodata', 0x3ff071c0, 0x8e30) ] -} # type: Dict[str, List[SectionAddressRange]] +} class Symbol(object): - def __init__(self, name, addr, local, filename, section): # type: (str, int, bool, Optional[str], Optional[str]) -> None + def __init__(self, name: str, addr: int, local: bool, filename: Optional[str], section: Optional[str]) -> None: self.name = name self.addr = addr self.local = local self.filename = filename self.section = section - self.refers_to = list() # type: List[Symbol] - self.referred_from = list() # type: List[Symbol] + self.refers_to: List[Symbol] = list() + self.referred_from: List[Symbol] = list() - def __str__(self): + def __str__(self) -> str: return '{} @0x{:08x} [{}]{} {}'.format( self.name, self.addr, @@ -84,11 +80,11 @@ class Symbol(object): class Reference(object): - def __init__(self, from_sym, to_sym): # type: (Symbol, Symbol) -> None + def __init__(self, from_sym: Symbol, to_sym: Symbol) -> None: self.from_sym = from_sym self.to_sym = to_sym - def __str__(self): + def __str__(self) -> str: return '{} @0x{:08x} ({}) -> {} @0x{:08x} ({})'.format( self.from_sym.name, self.from_sym.addr, @@ -100,13 +96,13 @@ class Reference(object): class ElfInfo(object): - def __init__(self, elf_file): # type: (BinaryIO) -> None + def __init__(self, elf_file: BinaryIO) -> None: self.elf_file = elf_file self.elf_obj = elffile.ELFFile(self.elf_file) self.section_ranges = self._load_sections() self.symbols = self._load_symbols() - def _load_symbols(self): # type: () -> List[Symbol] + def _load_symbols(self) -> List[Symbol]: symbols = [] for s in self.elf_obj.iter_sections(): if not isinstance(s, elftools.elf.sections.SymbolTableSection): @@ -130,7 +126,7 @@ class ElfInfo(object): ) return symbols - def _load_sections(self): # type: () -> List[SectionAddressRange] + def _load_sections(self) -> List[SectionAddressRange]: result = [] for segment in self.elf_obj.iter_segments(): if segment['p_type'] == 'PT_LOAD': @@ -149,22 +145,22 @@ class ElfInfo(object): return result - def symbols_by_name(self, name): # type: (str) -> List[Symbol] + def symbols_by_name(self, name: str) -> List['Symbol']: res = [] for sym in self.symbols: if sym.name == name: res.append(sym) return res - def section_for_addr(self, sym_addr): # type: (int) -> Optional[str] + def section_for_addr(self, sym_addr: int) -> Optional[str]: for sar in self.section_ranges: if sar.contains_address(sym_addr): return sar.name return None -def load_rtl_file(rtl_filename, tu_filename, functions): # type: (str, str, List[RtlFunction]) -> None - last_function = None # type: Optional[RtlFunction] +def load_rtl_file(rtl_filename: str, tu_filename: str, functions: List[RtlFunction]) -> None: + last_function: Optional[RtlFunction] = None for line in open(rtl_filename): # Find function definition match = re.match(FUNCTION_REGEX, line) @@ -192,7 +188,7 @@ def load_rtl_file(rtl_filename, tu_filename, functions): # type: (str, str, Lis continue -def rtl_filename_matches_sym_filename(rtl_filename, symbol_filename): # type: (str, str) -> bool +def rtl_filename_matches_sym_filename(rtl_filename: str, symbol_filename: str) -> bool: # Symbol file names (from ELF debug info) are short source file names, without path: "cpu_start.c". # RTL file names are paths relative to the build directory, e.g.: # "build/esp-idf/esp_system/CMakeFiles/__idf_esp_system.dir/port/cpu_start.c.234r.expand" @@ -211,7 +207,7 @@ class SymbolNotFound(RuntimeError): pass -def find_symbol_by_name(name, elfinfo, local_func_matcher): # type: (str, ElfInfo, Callable[[Symbol], bool]) -> Optional[Symbol] +def find_symbol_by_name(name: str, elfinfo: ElfInfo, local_func_matcher: Callable[[Symbol], bool]) -> Optional[Symbol]: """ Find an ELF symbol for the given name. local_func_matcher is a callback function which checks is the candidate local symbol is suitable. @@ -238,7 +234,7 @@ def find_symbol_by_name(name, elfinfo, local_func_matcher): # type: (str, ElfIn return local_candidate or global_candidate -def match_local_source_func(rtl_filename, sym): # type: (str, Symbol) -> bool +def match_local_source_func(rtl_filename: str, sym: Symbol) -> bool: """ Helper for match_rtl_funcs_to_symbols, checks if local symbol sym is a good candidate for the reference source (caller), based on the RTL file name. @@ -247,7 +243,7 @@ def match_local_source_func(rtl_filename, sym): # type: (str, Symbol) -> bool return rtl_filename_matches_sym_filename(rtl_filename, sym.filename) -def match_local_target_func(rtl_filename, sym_from, sym): # type: (str, Symbol, Symbol) -> bool +def match_local_target_func(rtl_filename: str, sym_from: Symbol, sym: Symbol) -> bool: """ Helper for match_rtl_funcs_to_symbols, checks if local symbol sym is a good candidate for the reference target (callee or referenced data), based on RTL filename of the source symbol @@ -263,9 +259,9 @@ def match_local_target_func(rtl_filename, sym_from, sym): # type: (str, Symbol, return rtl_filename_matches_sym_filename(rtl_filename, sym.filename) -def match_rtl_funcs_to_symbols(rtl_functions, elfinfo): # type: (List[RtlFunction], ElfInfo) -> Tuple[List[Symbol], List[Reference]] - symbols = [] # type: List[Symbol] - refs = [] # type: List[Reference] +def match_rtl_funcs_to_symbols(rtl_functions: List[RtlFunction], elfinfo: ElfInfo) -> Tuple[List[Symbol], List[Reference]]: + symbols: List[Symbol] = [] + refs: List[Reference] = [] # General idea: # - iterate over RTL functions. @@ -308,17 +304,17 @@ def match_rtl_funcs_to_symbols(rtl_functions, elfinfo): # type: (List[RtlFuncti return symbols, refs -def get_symbols_and_refs(rtl_list, elf_file): # type: (List[str], BinaryIO) -> Tuple[List[Symbol], List[Reference]] +def get_symbols_and_refs(rtl_list: List[str], elf_file: BinaryIO) -> Tuple[List[Symbol], List[Reference]]: elfinfo = ElfInfo(elf_file) - rtl_functions = [] # type: List[RtlFunction] + rtl_functions: List[RtlFunction] = [] for file_name in rtl_list: load_rtl_file(file_name, file_name, rtl_functions) return match_rtl_funcs_to_symbols(rtl_functions, elfinfo) -def list_refs_from_to_sections(refs, from_sections, to_sections): # type: (List[Reference], List[str], List[str]) -> int +def list_refs_from_to_sections(refs: List[Reference], from_sections: List[str], to_sections: List[str]) -> int: found = 0 for ref in refs: if (not from_sections or ref.from_sym.section in from_sections) and \ @@ -328,7 +324,7 @@ def list_refs_from_to_sections(refs, from_sections, to_sections): # type: (List return found -def find_files_recursive(root_path, ext): # type: (str, str) -> Generator[str, None, None] +def find_files_recursive(root_path: str, ext: str) -> Generator[str, None, None]: for root, _, files in os.walk(root_path): for basename in files: if basename.endswith(ext): @@ -336,7 +332,7 @@ def find_files_recursive(root_path, ext): # type: (str, str) -> Generator[str, yield filename -def main(): +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument( @@ -379,7 +375,7 @@ def main(): args = parser.parse_args() if args.rtl_list: with open(args.rtl_list, 'r') as rtl_list_file: - rtl_list = [line.strip for line in rtl_list_file] + rtl_list = [line.strip() for line in rtl_list_file] else: if not args.rtl_dir: raise RuntimeError('Either --rtl-list or --rtl-dir must be specified') diff --git a/tools/ci/check_codeowners.py b/tools/ci/check_codeowners.py index 3c7010b28e..89f3cf6df8 100755 --- a/tools/ci/check_codeowners.py +++ b/tools/ci/check_codeowners.py @@ -10,6 +10,7 @@ import os import re import subprocess import sys +from typing import List, Optional from idf_ci_utils import IDF_PATH @@ -17,7 +18,7 @@ CODEOWNERS_PATH = os.path.join(IDF_PATH, '.gitlab', 'CODEOWNERS') CODEOWNER_GROUP_PREFIX = '@esp-idf-codeowners/' -def get_all_files(): +def get_all_files() -> List[str]: """ Get list of all file paths in the repository. """ @@ -25,7 +26,7 @@ def get_all_files(): return subprocess.check_output(['git', 'ls-files'], cwd=IDF_PATH).decode('utf-8').strip().split('\n') -def pattern_to_regex(pattern): +def pattern_to_regex(pattern: str) -> str: """ Convert the CODEOWNERS path pattern into a regular expression string. """ @@ -59,14 +60,14 @@ def pattern_to_regex(pattern): return re_pattern -def files_by_regex(all_files, regex): +def files_by_regex(all_files: List, regex: re.Pattern) -> List: """ Return all files in the repository matching the given regular expresion. """ return [file for file in all_files if regex.search('/' + file)] -def files_by_pattern(all_files, pattern=None): +def files_by_pattern(all_files: list, pattern: Optional[str]=None) -> List: """ Return all the files in the repository matching the given CODEOWNERS pattern. """ @@ -76,7 +77,7 @@ def files_by_pattern(all_files, pattern=None): return files_by_regex(all_files, re.compile(pattern_to_regex(pattern))) -def action_identify(args): +def action_identify(args: argparse.Namespace) -> None: best_match = [] all_files = get_all_files() with open(CODEOWNERS_PATH) as f: @@ -94,7 +95,7 @@ def action_identify(args): print(owner) -def action_test_pattern(args): +def action_test_pattern(args: argparse.Namespace) -> None: re_pattern = pattern_to_regex(args.pattern) if args.regex: @@ -106,10 +107,10 @@ def action_test_pattern(args): print(f) -def action_ci_check(args): +def action_ci_check(args: argparse.Namespace) -> None: errors = [] - def add_error(msg): + def add_error(msg: str) -> None: errors.append('{}:{}: {}'.format(CODEOWNERS_PATH, line_no, msg)) all_files = get_all_files() @@ -158,7 +159,7 @@ def action_ci_check(args): raise SystemExit(1) -def in_order(prev, current): +def in_order(prev: str, current: str) -> bool: """ Return True if the ordering is correct for these two lines ('prev' should be before 'current'). @@ -172,10 +173,10 @@ def in_order(prev, current): if not prev: return True # first element in file - def is_separator(c): + def is_separator(c: str) -> bool: return c in '-_/' # ignore differences between separators for ordering purposes - def is_wildcard(c): + def is_wildcard(c: str) -> bool: return c in '?*' # looping until we see a different character @@ -192,7 +193,7 @@ def in_order(prev, current): return len(current) >= len(prev) -def main(): +def main() -> None: parser = argparse.ArgumentParser( sys.argv[0], description='Internal helper script for working with the CODEOWNERS file.' ) diff --git a/tools/ci/check_copyright_ignore.txt b/tools/ci/check_copyright_ignore.txt index dd57bf50bf..18dd69dbfb 100644 --- a/tools/ci/check_copyright_ignore.txt +++ b/tools/ci/check_copyright_ignore.txt @@ -2028,7 +2028,6 @@ tools/ble/lib_ble_client.py tools/ble/lib_gap.py tools/ble/lib_gatt.py tools/catch/catch.hpp -tools/esp_app_trace/test/sysview/blink.c tools/find_build_apps/__init__.py tools/find_build_apps/cmake.py tools/find_build_apps/common.py diff --git a/tools/ci/check_deprecated_kconfigs.py b/tools/ci/check_deprecated_kconfigs.py index e10585a11e..e3a3d8e1ab 100755 --- a/tools/ci/check_deprecated_kconfigs.py +++ b/tools/ci/check_deprecated_kconfigs.py @@ -9,6 +9,7 @@ import argparse import os import sys from io import open +from typing import Set, Tuple from check_kconfigs import valid_directory from idf_ci_utils import get_submodule_dirs @@ -19,11 +20,11 @@ FILES_TO_CHECK = ('sdkconfig.ci', 'sdkconfig.defaults') # ignored directories (makes sense only when run on IDF_PATH) # Note: IGNORE_DIRS is a tuple in order to be able to use it directly with the startswith() built-in function which # accepts tuples but no lists. -IGNORE_DIRS = ( +IGNORE_DIRS: Tuple = ( ) -def _parse_path(path, sep=None): +def _parse_path(path: os.PathLike[str], sep: str=None) -> Set: ret = set() with open(path, 'r', encoding='utf-8') as f: for line in f: @@ -33,13 +34,13 @@ def _parse_path(path, sep=None): return ret -def _valid_directory(path): +def _valid_directory(path: os.PathLike[str]) -> os.PathLike[str]: if not os.path.isdir(path): raise argparse.ArgumentTypeError('{} is not a valid directory!'.format(path)) return path -def main(): +def check() -> int: parser = argparse.ArgumentParser(description='Kconfig options checker') parser.add_argument('files', nargs='*', help='Kconfig files') @@ -102,5 +103,9 @@ def main(): return 0 +def main() -> None: + sys.exit(check()) + + if __name__ == '__main__': - sys.exit(main()) + main() diff --git a/tools/ci/check_executables.py b/tools/ci/check_executables.py index c53481de52..ea9ee35cf2 100755 --- a/tools/ci/check_executables.py +++ b/tools/ci/check_executables.py @@ -6,6 +6,7 @@ import argparse import os import sys +from typing import Iterable, List try: from idf_ci_utils import is_executable @@ -15,7 +16,7 @@ except ImportError: from idf_ci_utils import is_executable -def _strip_each_item(iterable): +def _strip_each_item(iterable: Iterable) -> List: res = [] for item in iterable: if item: @@ -28,7 +29,7 @@ EXECUTABLE_LIST_FN = os.path.join(IDF_PATH, 'tools/ci/executable-list.txt') known_executables = _strip_each_item(open(EXECUTABLE_LIST_FN).readlines()) -def check_executable_list(): +def check_executable_list() -> int: ret = 0 for index, fn in enumerate(known_executables): if not os.path.exists(os.path.join(IDF_PATH, fn)): @@ -37,7 +38,7 @@ def check_executable_list(): return ret -def check_executables(files): +def check_executables(files: List) -> int: ret = 0 for fn in files: fn_executable = is_executable(fn) @@ -51,7 +52,7 @@ def check_executables(files): return ret -def main(): +def check() -> int: parser = argparse.ArgumentParser() parser.add_argument('--action', choices=['executables', 'list'], required=True, help='if "executables", pass all your executables to see if it\'s in the list.' @@ -70,4 +71,4 @@ def main(): if __name__ == '__main__': - sys.exit(main()) + sys.exit(check()) diff --git a/tools/ci/check_public_headers.py b/tools/ci/check_public_headers.py index 427e8dacc0..dc209bd870 100644 --- a/tools/ci/check_public_headers.py +++ b/tools/ci/check_public_headers.py @@ -18,6 +18,7 @@ import subprocess import tempfile from io import open from threading import Event, Thread +from typing import List, Optional, Set, Tuple, Union class HeaderFailed(Exception): @@ -26,28 +27,28 @@ class HeaderFailed(Exception): class HeaderFailedSdkconfig(HeaderFailed): - def __str__(self): + def __str__(self) -> str: return 'Sdkconfig Error' class HeaderFailedBuildError(HeaderFailed): - def __str__(self): + def __str__(self) -> str: return 'Header Build Error' class HeaderFailedCppGuardMissing(HeaderFailed): - def __str__(self): + def __str__(self) -> str: return 'Header Missing C++ Guard' class HeaderFailedContainsCode(HeaderFailed): - def __str__(self): + def __str__(self) -> str: return 'Header Produced non-zero object' # Creates a temp file and returns both output as a string and a file name # -def exec_cmd_to_temp_file(what, suffix=''): +def exec_cmd_to_temp_file(what: List, suffix: str='') -> Tuple[int, str, str, str, str]: out_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) rc, out, err, cmd = exec_cmd(what, out_file) with open(out_file.name, 'r', encoding='utf-8') as f: @@ -55,12 +56,12 @@ def exec_cmd_to_temp_file(what, suffix=''): return rc, out, err, out_file.name, cmd -def exec_cmd(what, out_file=subprocess.PIPE): +def exec_cmd(what: List, out_file: Union[tempfile._TemporaryFileWrapper[bytes], int]=subprocess.PIPE) -> Tuple[int, str, str, str]: p = subprocess.Popen(what, stdin=subprocess.PIPE, stdout=out_file, stderr=subprocess.PIPE) - output, err = p.communicate() + output_b, err_b = p.communicate() rc = p.returncode - output = output.decode('utf-8') if output is not None else None - err = err.decode('utf-8') if err is not None else None + output: str = output_b.decode('utf-8') if output is not None else '' + err: str = err_b.decode('utf-8') if err is not None else '' return rc, output, err, ' '.join(what) @@ -74,11 +75,11 @@ class PublicHeaderChecker: PREPROC_OUT_DIFFERENT_WITH_EXT_C_HDR_OK = 6 # -> Both preprocessors produce different, non-zero output with extern "C" (header seems OK) PREPROC_OUT_DIFFERENT_NO_EXT_C_HDR_FAILED = 7 # -> Both preprocessors produce different, non-zero output without extern "C" (header fails) - def log(self, message, debug=False): + def log(self, message: str, debug: bool=False) -> None: if self.verbose or debug: print(message) - def __init__(self, verbose=False, jobs=1, prefix=None): + def __init__(self, verbose: bool=False, jobs: int=1, prefix: Optional[str]=None) -> None: self.gcc = '{}gcc'.format(prefix) self.gpp = '{}g++'.format(prefix) self.verbose = verbose @@ -89,26 +90,26 @@ class PublicHeaderChecker: self.error_orphan_kconfig = re.compile(r'#error CONFIG_VARS_USED_WHILE_SDKCONFIG_NOT_INCLUDED') self.kconfig_macro = re.compile(r'\bCONFIG_[A-Z0-9_]+') self.assembly_nocode = r'^\s*(\.file|\.text|\.ident|\.option|\.attribute).*$' - self.check_threads = [] + self.check_threads: List[Thread] = [] - self.job_queue = queue.Queue() - self.failed_queue = queue.Queue() + self.job_queue: queue.Queue = queue.Queue() + self.failed_queue: queue.Queue = queue.Queue() self.terminate = Event() - def __enter__(self): + def __enter__(self) -> 'PublicHeaderChecker': for i in range(self.jobs): t = Thread(target=self.check_headers, args=(i, )) self.check_threads.append(t) t.start() return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type: str, exc_value: str, traceback: str) -> None: self.terminate.set() for t in self.check_threads: t.join() # thread function process incoming header file from a queue - def check_headers(self, num): + def check_headers(self, num: int) -> None: while not self.terminate.is_set(): if not self.job_queue.empty(): task = self.job_queue.get() @@ -125,10 +126,10 @@ class PublicHeaderChecker: self.terminate.set() raise - def get_failed(self): + def get_failed(self) -> List: return list(self.failed_queue.queue) - def join(self): + def join(self) -> None: for t in self.check_threads: while t.is_alive() and not self.terminate.is_set(): t.join(1) # joins with timeout to respond to keyboard interrupt @@ -147,7 +148,7 @@ class PublicHeaderChecker: # - Fail the test if the preprocessor outputs are the same (but with some code) # - If outputs different, pass the test # 4) If header passed the steps 1) and 3) test that it produced zero assembly code - def check_one_header(self, header, num): + def check_one_header(self, header: str, num: int) -> None: res = self.preprocess_one_header(header, num) if res == self.COMPILE_ERR_REF_CONFIG_HDR_FAILED: raise HeaderFailedSdkconfig() @@ -173,7 +174,7 @@ class PublicHeaderChecker: if temp_header: os.unlink(temp_header) - def compile_one_header(self, header): + def compile_one_header(self, header: str) -> None: rc, out, err, cmd = exec_cmd([self.gcc, '-S', '-o-', '-include', header, self.main_c] + self.include_dir_flags) if rc == 0: if not re.sub(self.assembly_nocode, '', out, flags=re.M).isspace(): @@ -184,7 +185,7 @@ class PublicHeaderChecker: self.log('\nCompilation command failed:\n{}\n'.format(cmd), True) raise HeaderFailedBuildError() - def preprocess_one_header(self, header, num, ignore_sdkconfig_issue=False): + def preprocess_one_header(self, header: str, num: int, ignore_sdkconfig_issue: bool=False) -> int: all_compilation_flags = ['-w', '-P', '-E', '-DESP_PLATFORM', '-include', header, self.main_c] + self.include_dir_flags if not ignore_sdkconfig_issue: # just strip commnets to check for CONFIG_... macros @@ -232,8 +233,10 @@ class PublicHeaderChecker: pass # Get compilation data from an example to list all public header files - def list_public_headers(self, ignore_dirs, ignore_files, only_dir=None): + def list_public_headers(self, ignore_dirs: List, ignore_files: Union[List, Set], only_dir: str=None) -> None: idf_path = os.getenv('IDF_PATH') + if idf_path is None: + raise RuntimeError("Environment variable 'IDF_PATH' wasn't set.") project_dir = os.path.join(idf_path, 'examples', 'get-started', 'blink') build_dir = tempfile.mkdtemp() sdkconfig = os.path.join(build_dir, 'sdkconfig') @@ -283,22 +286,22 @@ class PublicHeaderChecker: self.include_dir_flags = include_dir_flags ignore_files = set(ignore_files) # processes public include files, removing ignored files - for f in all_include_files: - rel_path_file = os.path.relpath(f, idf_path) + for file_name in all_include_files: + rel_path_file = os.path.relpath(file_name, idf_path) if any([os.path.commonprefix([d, rel_path_file]) == d for d in ignore_dirs]): - self.log('{} - file ignored (inside ignore dir)'.format(f)) + self.log('{} - file ignored (inside ignore dir)'.format(file_name)) continue if rel_path_file in ignore_files: - self.log('{} - file ignored'.format(f)) + self.log('{} - file ignored'.format(file_name)) continue - files_to_check.append(f) + files_to_check.append(file_name) # removes duplicates and places headers to a work queue - for f in set(files_to_check): - self.job_queue.put(f) + for file_name in set(files_to_check): + self.job_queue.put(file_name) self.job_queue.put(None) # to indicate the last job -def check_all_headers(): +def check_all_headers() -> None: parser = argparse.ArgumentParser('Public header checker file', formatter_class=argparse.RawDescriptionHelpFormatter, epilog='''\ Tips for fixing failures reported by this script ------------------------------------------------ diff --git a/tools/ci/check_readme_links.py b/tools/ci/check_readme_links.py index e988534109..80ed216d54 100755 --- a/tools/ci/check_readme_links.py +++ b/tools/ci/check_readme_links.py @@ -16,6 +16,7 @@ import urllib.error import urllib.request from collections import defaultdict, namedtuple from pathlib import Path +from typing import List EXCLUDE_DOCS_LIST = ['examples/peripherals/secure_element/atecc608_ecdsa/components/esp-cryptoauthlib/cryptoauthlib/**'] @@ -26,28 +27,28 @@ Link = namedtuple('Link', ['file', 'url']) class ReadmeLinkError(Exception): - def __init__(self, file, url): + def __init__(self, file: str, url: str) -> None: self.file = file self.url = url class RelativeLinkError(ReadmeLinkError): - def __str__(self): + def __str__(self) -> str: return 'Relative link error, file - {} not found, linked from {}'.format(self.url, self.file) class UrlLinkError(ReadmeLinkError): - def __init__(self, file, url, error_code): + def __init__(self, file: str, url: str, error_code: str): self.error_code = error_code super().__init__(file, url) - def __str__(self): + def __str__(self) -> str: files = [str(f) for f in self.file] return 'URL error, url - {} in files - {} is not accessible, request returned {}'.format(self.url, ', '.join(files), self.error_code) # we do not want a failed test just due to bad network conditions, for non 404 errors we simply print a warning -def check_url(url, files, timeout): +def check_url(url: str, files: str, timeout: float) -> None: try: with urllib.request.urlopen(url, timeout=timeout): return @@ -60,7 +61,7 @@ def check_url(url, files, timeout): print('Unable to access {}, err = {}'.format(url, str(e))) -def check_web_links(web_links): +def check_web_links(web_links: defaultdict) -> List: with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: errors = [] @@ -74,7 +75,7 @@ def check_web_links(web_links): return errors -def check_file_links(file_links): +def check_file_links(file_links: List) -> List: errors = [] for link in file_links: @@ -87,10 +88,13 @@ def check_file_links(file_links): return errors -def get_md_links(folder): +def get_md_links(folder: str) -> List: MD_LINK_RE = r'\[.+?\]\((.+?)(#.+)?\)' - idf_path = Path(os.getenv('IDF_PATH')) + idf_path_str = os.getenv('IDF_PATH') + if idf_path_str is None: + raise RuntimeError("Environment variable 'IDF_PATH' wasn't set.") + idf_path = Path(idf_path_str) links = [] for path in (idf_path / folder).rglob('*.md'): @@ -110,7 +114,7 @@ def get_md_links(folder): return links -def check_readme_links(args): +def check_readme_links(args: argparse.Namespace) -> int: links = get_md_links('examples') print('Found {} links'.format(len(links))) diff --git a/tools/ci/check_rules_yml.py b/tools/ci/check_rules_yml.py index 2846b20f83..2f89fb8fed 100755 --- a/tools/ci/check_rules_yml.py +++ b/tools/ci/check_rules_yml.py @@ -12,7 +12,7 @@ import os import re import sys from copy import deepcopy -from typing import List +from typing import Any, Dict, List, Optional, Set, Union import yaml from idf_ci_utils import IDF_PATH @@ -20,20 +20,20 @@ from idf_ci_utils import IDF_PATH ROOT_YML_FP = os.path.join(IDF_PATH, '.gitlab-ci.yml') -def load_yaml(file_path): +def load_yaml(file_path: str) -> Any: return yaml.load(open(file_path), Loader=yaml.FullLoader) class YMLConfig: - def __init__(self, root_yml_file_path): - self._config = None - self._all_extends = None + def __init__(self, root_yml_file_path: str) -> None: + self._config: Optional[Dict] = None + self._all_extends: Optional[Set] = None self.root_yml = load_yaml(root_yml_file_path) assert self.root_yml @staticmethod - def _list(str_or_list): + def _list(str_or_list: Union[str, List]) -> List: if isinstance(str_or_list, str): return [str_or_list] if isinstance(str_or_list, list): @@ -43,7 +43,7 @@ class YMLConfig: ) @property - def config(self): + def config(self) -> Dict: if self._config: return self._config @@ -54,7 +54,7 @@ class YMLConfig: return self._config @property - def all_extends(self): + def all_extends(self) -> Set: if self._all_extends: return self._all_extends @@ -67,7 +67,7 @@ class YMLConfig: self._all_extends = res return self._all_extends - def exists(self, key): + def exists(self, key: str) -> bool: if key in self.all_extends: return True return False @@ -76,7 +76,7 @@ class YMLConfig: YML_CONFIG = YMLConfig(ROOT_YML_FP) -def validate_needed_rules(rules_yml): +def validate_needed_rules(rules_yml: os.PathLike[str]) -> int: res = 0 needed_rules = deepcopy(YML_CONFIG.all_extends) with open(rules_yml) as fr: @@ -114,7 +114,7 @@ def parse_submodule_paths( return res -def validate_submodule_patterns(): +def validate_submodule_patterns() -> int: submodule_paths = sorted(['.gitmodules'] + parse_submodule_paths()) submodule_paths_in_patterns = sorted( YML_CONFIG.config.get('.patterns-submodule', []) diff --git a/tools/ci/check_tools_files_patterns.py b/tools/ci/check_tools_files_patterns.py index 1244449c14..9b6407f451 100755 --- a/tools/ci/check_tools_files_patterns.py +++ b/tools/ci/check_tools_files_patterns.py @@ -7,19 +7,20 @@ import argparse import os import sys from pathlib import Path +from typing import Set, Tuple import yaml from idf_ci_utils import IDF_PATH, get_git_files -def check(pattern_yml, exclude_list): +def check(pattern_yml: str, exclude_list: str) -> Tuple[Set, Set]: rules_dict = yaml.load(open(pattern_yml), Loader=yaml.FullLoader) rules_patterns_set = set() for k, v in rules_dict.items(): if k.startswith('.pattern') and k != '.patterns-python-files' and isinstance(v, list): rules_patterns_set.update(v) - rules_files_set = set() + rules_files_set: Set = set() idf_path = Path(IDF_PATH) for pat in rules_patterns_set: rules_files_set.update(idf_path.glob(pat)) @@ -30,7 +31,7 @@ def check(pattern_yml, exclude_list): if pat: exclude_patterns_set.add(pat) - exclude_files_set = set() + exclude_files_set: Set = set() for pat in exclude_patterns_set: exclude_files_set.update(idf_path.glob(pat)) diff --git a/tools/ci/checkout_project_ref.py b/tools/ci/checkout_project_ref.py index 6bf2a4456b..10af27c760 100755 --- a/tools/ci/checkout_project_ref.py +++ b/tools/ci/checkout_project_ref.py @@ -10,27 +10,28 @@ import json import os import re import subprocess +from typing import List IDF_GIT_DESCRIBE_PATTERN = re.compile(r'^v(\d)\.(\d)') RETRY_COUNT = 3 -def get_customized_project_revision(proj_name): +def get_customized_project_revision(proj_name: str) -> str: """ get customized project revision defined in bot message """ revision = '' - customized_project_revisions = os.getenv('BOT_CUSTOMIZED_REVISION') - if customized_project_revisions: - customized_project_revisions = json.loads(customized_project_revisions) - try: - revision = customized_project_revisions[proj_name.lower()] - except (KeyError, TypeError): - pass + customized_project_revisions_file = os.getenv('BOT_CUSTOMIZED_REVISION') + if customized_project_revisions_file: + customized_project_revisions = json.loads(customized_project_revisions_file) + try: + revision = customized_project_revisions[proj_name.lower()] + except (KeyError, TypeError): + pass return revision -def target_branch_candidates(proj_name): +def target_branch_candidates(proj_name: str) -> List: """ :return: a list of target branch candidates, from highest priority to lowest priority. """ diff --git a/tools/ci/ci_fetch_submodule.py b/tools/ci/ci_fetch_submodule.py index 5961e12522..821ce93caf 100644 --- a/tools/ci/ci_fetch_submodule.py +++ b/tools/ci/ci_fetch_submodule.py @@ -11,6 +11,7 @@ import re import shutil import subprocess import time +from typing import Any, List import gitlab_api @@ -28,27 +29,26 @@ class SubModule(object): GIT_LS_TREE_OUTPUT_PATTERN = re.compile(r'\d+\s+commit\s+([0-9a-f]+)\s+') - def __init__(self, gitlab_inst, path, url): + def __init__(self, gitlab_inst: gitlab_api.Gitlab, path: str, url: str) -> None: self.path = path self.url = url self.gitlab_inst = gitlab_inst self.project_id = self._get_project_id(url) self.commit_id = self._get_commit_id(path) - def _get_commit_id(self, path): - output = subprocess.check_output(['git', 'ls-tree', 'HEAD', path]) - output = output.decode() + def _get_commit_id(self, path: str) -> str: + output = subprocess.check_output(['git', 'ls-tree', 'HEAD', path]).decode() # example output: 160000 commit d88a262fbdf35e5abb372280eb08008749c3faa0 components/esp_wifi/lib match = self.GIT_LS_TREE_OUTPUT_PATTERN.search(output) - return match.group(1) + return match.group(1) if match is not None else '' - def _get_project_id(self, url): + def _get_project_id(self, url: str) -> Any: base_name = os.path.basename(url) project_id = self.gitlab_inst.get_project_id(os.path.splitext(base_name)[0], # remove .git namespace='espressif') return project_id - def download_archive(self): + def download_archive(self) -> None: print('Update submodule: {}: {}'.format(self.path, self.commit_id)) path_name = self.gitlab_inst.download_archive(self.commit_id, SUBMODULE_ARCHIVE_TEMP_FOLDER, self.project_id, SUBMODULE_ARCHIVE_CACHE_DIR) @@ -58,33 +58,34 @@ class SubModule(object): shutil.move(renamed_path, os.path.dirname(self.path)) -def update_submodule(git_module_file, submodules_to_update): +def update_submodule(git_module_file: str, submodules_to_update: List) -> None: gitlab_inst = gitlab_api.Gitlab() submodules = [] with open(git_module_file, 'r') as f: data = f.read() match = SUBMODULE_PATTERN.search(data) - while True: - next_match = SUBMODULE_PATTERN.search(data, pos=match.end()) - if next_match: - end_pos = next_match.start() - else: - end_pos = len(data) - path_match = PATH_PATTERN.search(data, pos=match.end(), endpos=end_pos) - url_match = URL_PATTERN.search(data, pos=match.end(), endpos=end_pos) - path = path_match.group(1) - url = url_match.group(1) + if match is not None: + while True: + next_match = SUBMODULE_PATTERN.search(data, pos=match.end()) + if next_match: + end_pos = next_match.start() + else: + end_pos = len(data) + path_match = PATH_PATTERN.search(data, pos=match.end(), endpos=end_pos) + url_match = URL_PATTERN.search(data, pos=match.end(), endpos=end_pos) + path = path_match.group(1) if path_match is not None else '' + url = url_match.group(1) if url_match is not None else '' - filter_result = True - if submodules_to_update: - if path not in submodules_to_update: - filter_result = False - if filter_result: - submodules.append(SubModule(gitlab_inst, path, url)) + filter_result = True + if submodules_to_update: + if path not in submodules_to_update: + filter_result = False + if filter_result: + submodules.append(SubModule(gitlab_inst, path, url)) - match = next_match - if not match: - break + match = next_match + if not match: + break shutil.rmtree(SUBMODULE_ARCHIVE_TEMP_FOLDER, ignore_errors=True) diff --git a/tools/ci/deploy_docs.py b/tools/ci/deploy_docs.py index b9f61240b0..954958e738 100755 --- a/tools/ci/deploy_docs.py +++ b/tools/ci/deploy_docs.py @@ -14,11 +14,12 @@ import stat import subprocess import sys import tarfile +from typing import Any, List, Tuple import packaging.version -def env(variable, default=None): +def env(variable: str, default: str=None) -> str: """ Shortcut to return the expanded version of an environment variable """ return os.path.expandvars(os.environ.get(variable, default) if default else os.environ[variable]) @@ -28,7 +29,7 @@ sys.path.append(os.path.join(env('IDF_PATH'), 'docs')) from sanitize_version import sanitize_version # noqa -def main(): +def main() -> None: # if you get KeyErrors on the following lines, it's probably because you're not running in Gitlab CI git_ver = env('GIT_VER') # output of git describe --always ci_ver = env('CI_COMMIT_REF_NAME', git_ver) # branch or tag we're building for (used for 'release' & URL) @@ -87,8 +88,8 @@ def main(): deploy('stable', tarball_path, docs_path, docs_server) -def deploy(version, tarball_path, docs_path, docs_server): - def run_ssh(commands): +def deploy(version: str, tarball_path: str, docs_path: str, docs_server: str) -> None: + def run_ssh(commands: List) -> None: """ Log into docs_server and run a sequence of commands using ssh """ print('Running ssh: {}'.format(commands)) subprocess.run(['ssh', '-o', 'BatchMode=yes', docs_server, '-x', ' && '.join(commands)], check=True) @@ -110,7 +111,7 @@ def deploy(version, tarball_path, docs_path, docs_server): # another thing made much more complex by the directory structure putting language before version... -def build_doc_tarball(version, git_ver, build_dir): +def build_doc_tarball(version: str, git_ver: str, build_dir: str) -> Tuple[str, List]: """ Make a tar.gz archive of the docs, in the directory structure used to deploy as the given version """ version_paths = [] @@ -126,7 +127,8 @@ def build_doc_tarball(version, git_ver, build_dir): # add symlink for stable and latest and adds them to PDF blob symlinks = create_and_add_symlinks(version, git_ver, pdfs) - def not_sources_dir(ti): + def not_sources_dir(ti: Any) -> Any: + print(type(ti)) """ Filter the _sources directories out of the tarballs """ if ti.name.endswith('/_sources'): return None @@ -171,7 +173,7 @@ def build_doc_tarball(version, git_ver, build_dir): return (os.path.abspath(tarball_path), version_paths) -def create_and_add_symlinks(version, git_ver, pdfs): +def create_and_add_symlinks(version: str, git_ver: str, pdfs: List) -> List: """ Create symbolic links for PDFs for 'latest' and 'stable' releases """ symlinks = [] @@ -187,7 +189,7 @@ def create_and_add_symlinks(version, git_ver, pdfs): return symlinks -def is_stable_version(version): +def is_stable_version(version: str) -> bool: """ Heuristic for whether this is the latest stable release """ if not version.startswith('v'): return False # branch name @@ -197,11 +199,11 @@ def is_stable_version(version): git_out = subprocess.check_output(['git', 'tag', '-l']).decode('utf-8') versions = [v.strip() for v in git_out.split('\n')] - versions = [v for v in versions if re.match(r'^v[\d\.]+$', v)] # include vX.Y.Z only + versions = [v for v in versions if re.match(r'^v[\d\.]+$', v.strip())] # include vX.Y.Z only - versions = [packaging.version.parse(v) for v in versions] + versions_pack = [packaging.version.parse(v) for v in versions] - max_version = max(versions) + max_version = max(versions_pack) if max_version.public != version[1:]: print('Stable version is v{}. This version is {}.'.format(max_version.public, version)) diff --git a/tools/ci/envsubst.py b/tools/ci/envsubst.py index 8c9a0c7796..5ed285003f 100755 --- a/tools/ci/envsubst.py +++ b/tools/ci/envsubst.py @@ -9,7 +9,7 @@ import os import sys -def main(): +def main() -> None: # Sanitize environment variables vars_to_remove = [] for var_name in os.environ.keys(): diff --git a/tools/ci/mypy_ignore_list.txt b/tools/ci/mypy_ignore_list.txt index e2f8f4bf29..b0e37ec659 100644 --- a/tools/ci/mypy_ignore_list.txt +++ b/tools/ci/mypy_ignore_list.txt @@ -141,21 +141,7 @@ examples/wifi/iperf/iperf_test.py tools/ble/lib_ble_client.py tools/ble/lib_gap.py tools/ble/lib_gatt.py -tools/ci/check_artifacts_expire_time.py -tools/ci/check_callgraph.py -tools/ci/check_codeowners.py -tools/ci/check_deprecated_kconfigs.py -tools/ci/check_executables.py tools/ci/check_kconfigs.py -tools/ci/check_public_headers.py -tools/ci/check_readme_links.py -tools/ci/check_rules_yml.py -tools/ci/check_tools_files_patterns.py -tools/ci/checkout_project_ref.py -tools/ci/ci_fetch_submodule.py -tools/ci/deploy_docs.py -tools/ci/envsubst.py -tools/ci/normalize_clangtidy_path.py tools/ci/python_packages/idf_http_server_test/adder.py tools/ci/python_packages/idf_http_server_test/client.py tools/ci/python_packages/idf_http_server_test/test.py