Merge branch 'refactor/add_types_to_tools' into 'master'

tools: add python types hints

See merge request espressif/esp-idf!18434
This commit is contained in:
Roland Dobai 2022-06-17 17:08:19 +08:00
commit 60f845384f
12 changed files with 210 additions and 217 deletions

View File

@ -211,15 +211,7 @@ tools/find_apps.py
tools/find_build_apps/common.py
tools/gen_esp_err_to_name.py
tools/gen_soc_caps_kconfig/test/test_gen_soc_caps_kconfig.py
tools/idf.py
tools/idf_py_actions/core_ext.py
tools/idf_py_actions/create_ext.py
tools/idf_py_actions/debug_ext.py
tools/idf_py_actions/dfu_ext.py
tools/idf_py_actions/errors.py
tools/idf_py_actions/serial_ext.py
tools/idf_py_actions/tools.py
tools/idf_py_actions/uf2_ext.py
tools/kconfig_new/confgen.py
tools/kconfig_new/confserver.py
tools/kconfig_new/gen_kconfig_doc.py
@ -240,7 +232,6 @@ tools/ldgen/test/test_fragments.py
tools/ldgen/test/test_generation.py
tools/ldgen/test/test_output_commands.py
tools/mass_mfg/mfg_gen.py
tools/mkuf2.py
tools/test_apps/build_system/ldgen_test/check_placements.py
tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py
tools/test_apps/protocols/openssl/app_test.py

View File

@ -6,6 +6,7 @@
from __future__ import print_function, unicode_literals
import sys
from typing import Any, List, Optional, TextIO
try:
from builtins import object, range, str
@ -57,7 +58,7 @@ class ErrItem(object):
- rel_str - (optional) error string which is a base for the error
- rel_off - (optional) offset in relation to the base error
"""
def __init__(self, name, file, include_as=None, comment='', rel_str='', rel_off=0):
def __init__(self, name: str, file: str, include_as: Optional[Any]=None, comment: str='', rel_str: str='', rel_off: int=0) -> None:
self.name = name
self.file = file
self.include_as = include_as
@ -65,7 +66,7 @@ class ErrItem(object):
self.rel_str = rel_str
self.rel_off = rel_off
def __str__(self):
def __str__(self) -> str:
ret = self.name + ' from ' + self.file
if (self.rel_str != ''):
ret += ' is (' + self.rel_str + ' + ' + str(self.rel_off) + ')'
@ -73,7 +74,7 @@ class ErrItem(object):
ret += ' // ' + self.comment
return ret
def __cmp__(self, other):
def __cmp__(self, other) -> int:
if self.file in priority_headers and other.file not in priority_headers:
return -1
elif self.file not in priority_headers and other.file in priority_headers:
@ -101,11 +102,11 @@ class InputError(RuntimeError):
"""
Represents and error on the input
"""
def __init__(self, p, e):
def __init__(self, p: str, e: str) -> None:
super(InputError, self).__init__(p + ': ' + e)
def process(line, idf_path, include_as):
def process(line: str, idf_path: str, include_as: Any) -> None:
"""
Process a line of text from file idf_path (relative to IDF project).
Fills the global list unproc_list and dictionaries err_dict, rev_err_dict
@ -168,7 +169,7 @@ def process(line, idf_path, include_as):
unproc_list.append(ErrItem(words[1], idf_path, include_as, comment, related, num))
def process_remaining_errors():
def process_remaining_errors() -> None:
"""
Create errors which could not be processed before because the error code
for the BASE error code wasn't known.
@ -189,7 +190,7 @@ def process_remaining_errors():
del unproc_list[:]
def path_to_include(path):
def path_to_include(path: str) -> str:
"""
Process the path (relative to the IDF project) in a form which can be used
to include in a C file. Using just the filename does not work all the
@ -210,7 +211,7 @@ def path_to_include(path):
return os.sep.join(spl_path[i + 1:]) # subdirectories and filename in "include"
def print_warning(error_list, error_code):
def print_warning(error_list: List, error_code: int) -> None:
"""
Print warning about errors with the same error code
"""
@ -219,7 +220,7 @@ def print_warning(error_list, error_code):
print(' ' + str(e))
def max_string_width():
def max_string_width() -> int:
max = 0
for k in err_dict:
for e in err_dict[k]:
@ -229,7 +230,7 @@ def max_string_width():
return max
def generate_c_output(fin, fout):
def generate_c_output(fin: TextIO, fout: TextIO) -> None:
"""
Writes the output to fout based on th error dictionary err_dict and
template file fin.
@ -294,7 +295,7 @@ def generate_c_output(fin, fout):
fout.write(line)
def generate_rst_output(fout):
def generate_rst_output(fout: TextIO) -> None:
for k in sorted(err_dict.keys()):
v = err_dict[k][0]
fout.write(':c:macro:`{}` '.format(v.name))
@ -307,7 +308,7 @@ def generate_rst_output(fout):
fout.write('\n\n')
def main():
def main() -> None:
if 'IDF_PATH' in os.environ:
idf_path = os.environ['IDF_PATH']
else:

View File

@ -13,7 +13,7 @@
# check_environment() function below. If possible, avoid importing
# any external libraries here - put in external script, or import in
# their specific function instead.
from __future__ import print_function
from __future__ import annotations
import codecs
import json
@ -23,9 +23,11 @@ import os.path
import signal
import subprocess
import sys
from collections import Counter, OrderedDict
from collections import Counter, OrderedDict, _OrderedDictKeysView
from importlib import import_module
from pkgutil import iter_modules
from types import FrameType
from typing import Any, Callable, Dict, List, Optional, TextIO, Union
# pyc files remain in the filesystem when switching between branches which might raise errors for incompatible
# idf.py extensions. Therefore, pyc file generation is turned off:
@ -35,7 +37,8 @@ import python_version_checker # noqa: E402
try:
from idf_py_actions.errors import FatalError # noqa: E402
from idf_py_actions.tools import executable_exists, idf_version, merge_action_lists, realpath # noqa: E402
from idf_py_actions.tools import (PropertyDict, executable_exists, idf_version, merge_action_lists, # noqa: E402
realpath)
except ImportError:
# For example, importing click could cause this.
print('Please use idf.py only in an ESP-IDF shell environment.', file=sys.stderr)
@ -61,12 +64,12 @@ SHELL_COMPLETE_RUN = SHELL_COMPLETE_VAR in os.environ
# function prints warning when autocompletion is not being performed
# set argument stream to sys.stderr for errors and exceptions
def print_warning(message, stream=None):
def print_warning(message: str, stream: TextIO=None) -> None:
if not SHELL_COMPLETE_RUN:
print(message, file=stream or sys.stderr)
def check_environment():
def check_environment() -> List:
"""
Verify the environment contains the top-level tools we need to operate
@ -121,7 +124,7 @@ def check_environment():
return checks_output
def _safe_relpath(path, start=None):
def _safe_relpath(path: str, start: Optional[str]=None) -> str:
""" Return a relative path, same as os.path.relpath, but only if this is possible.
It is not possible on Windows, if the start directory and the path are on different drives.
@ -132,7 +135,7 @@ def _safe_relpath(path, start=None):
return os.path.abspath(path)
def debug_print_idf_version():
def debug_print_idf_version() -> None:
version = idf_version()
if version:
print_warning('ESP-IDF %s' % version)
@ -140,30 +143,13 @@ def debug_print_idf_version():
print_warning('ESP-IDF version unknown')
class PropertyDict(dict):
def __getattr__(self, name):
if name in self:
return self[name]
else:
raise AttributeError("'PropertyDict' object has no attribute '%s'" % name)
def __setattr__(self, name, value):
self[name] = value
def __delattr__(self, name):
if name in self:
del self[name]
else:
raise AttributeError("'PropertyDict' object has no attribute '%s'" % name)
def init_cli(verbose_output=None):
def init_cli(verbose_output: List=None) -> Any:
# Click is imported here to run it after check_environment()
import click
class Deprecation(object):
"""Construct deprecation notice for help messages"""
def __init__(self, deprecated=False):
def __init__(self, deprecated: Union[Dict, str, bool]=False) -> None:
self.deprecated = deprecated
self.since = None
self.removed = None
@ -178,7 +164,7 @@ def init_cli(verbose_output=None):
elif isinstance(deprecated, str):
self.custom_message = deprecated
def full_message(self, type='Option'):
def full_message(self, type: str='Option') -> str:
if self.exit_with_error:
return '%s is deprecated %sand was removed%s.%s' % (
type,
@ -194,15 +180,15 @@ def init_cli(verbose_output=None):
' %s' % self.custom_message if self.custom_message else '',
)
def help(self, text, type='Option', separator=' '):
def help(self, text: str, type: str='Option', separator: str=' ') -> str:
text = text or ''
return self.full_message(type) + separator + text if self.deprecated else text
def short_help(self, text):
def short_help(self, text: str) -> str:
text = text or ''
return ('Deprecated! ' + text) if self.deprecated else text
def check_deprecation(ctx):
def check_deprecation(ctx: click.core.Context) -> None:
"""Prints deprecation warnings for arguments in given context"""
for option in ctx.command.params:
default = () if option.multiple else option.default
@ -214,7 +200,8 @@ def init_cli(verbose_output=None):
print_warning('Warning: %s' % deprecation.full_message('Option "%s"' % option.name))
class Task(object):
def __init__(self, callback, name, aliases, dependencies, order_dependencies, action_args):
def __init__(self, callback: Callable, name: str, aliases: List, dependencies: Optional[List],
order_dependencies: Optional[List], action_args: Dict) -> None:
self.callback = callback
self.name = name
self.dependencies = dependencies
@ -222,7 +209,7 @@ def init_cli(verbose_output=None):
self.action_args = action_args
self.aliases = aliases
def __call__(self, context, global_args, action_args=None):
def __call__(self, context: click.core.Context, global_args: PropertyDict, action_args: Dict=None) -> None:
if action_args is None:
action_args = self.action_args
@ -231,26 +218,24 @@ def init_cli(verbose_output=None):
class Action(click.Command):
def __init__(
self,
name=None,
aliases=None,
deprecated=False,
dependencies=None,
order_dependencies=None,
hidden=False,
**kwargs):
name: Optional[str]=None,
aliases: Optional[List]=None,
deprecated: Union[Dict, str, bool]=False,
dependencies: Optional[List]=None,
order_dependencies: Optional[List]=None,
hidden: bool=False,
**kwargs: Any) -> None:
super(Action, self).__init__(name, **kwargs)
self.name = self.name or self.callback.__name__
self.deprecated = deprecated
self.hidden = hidden
self.name: str = self.name or self.callback.__name__
self.deprecated: Union[Dict, str, bool] = deprecated
self.hidden: bool = hidden
if aliases is None:
aliases = []
self.aliases = aliases
self.help = self.help or self.callback.__doc__
if self.help is None:
self.help = ''
self.help: str = self.help or self.callback.__doc__ or ''
if dependencies is None:
dependencies = []
@ -259,7 +244,7 @@ def init_cli(verbose_output=None):
order_dependencies = []
# Show first line of help if short help is missing
self.short_help = self.short_help or self.help.split('\n')[0]
self.short_help: str = self.short_help or self.help.split('\n')[0]
if deprecated:
deprecation = Deprecation(deprecated)
@ -276,7 +261,7 @@ def init_cli(verbose_output=None):
self.unwrapped_callback = self.callback
if self.callback is not None:
def wrapped_callback(**action_args):
def wrapped_callback(**action_args: Any) -> Task:
return Task(
callback=self.unwrapped_callback,
name=self.name,
@ -288,7 +273,7 @@ def init_cli(verbose_output=None):
self.callback = wrapped_callback
def invoke(self, ctx):
def invoke(self, ctx: click.core.Context) -> click.core.Context:
if self.deprecated:
deprecation = Deprecation(self.deprecated)
message = deprecation.full_message('Command "%s"' % self.name)
@ -310,7 +295,7 @@ def init_cli(verbose_output=None):
names - alias of 'param_decls'
"""
def __init__(self, **kwargs):
def __init__(self, **kwargs: str):
names = kwargs.pop('names')
super(Argument, self).__init__(names, **kwargs)
@ -325,7 +310,7 @@ def init_cli(verbose_output=None):
SCOPES = ('default', 'global', 'shared')
def __init__(self, scope=None):
def __init__(self, scope: Union['Scope', str]=None) -> None:
if scope is None:
self._scope = 'default'
elif isinstance(scope, str) and scope in self.SCOPES:
@ -336,19 +321,19 @@ def init_cli(verbose_output=None):
raise FatalError('Unknown scope for option: %s' % scope)
@property
def is_global(self):
def is_global(self) -> bool:
return self._scope == 'global'
@property
def is_shared(self):
def is_shared(self) -> bool:
return self._scope == 'shared'
def __str__(self):
def __str__(self) -> str:
return self._scope
class Option(click.Option):
"""Option that knows whether it should be global"""
def __init__(self, scope=None, deprecated=False, hidden=False, **kwargs):
def __init__(self, scope: Union[Scope, str]=None, deprecated: Union[Dict, str, bool]=False, hidden: bool=False, **kwargs: str) -> None:
"""
Keyword arguments additional to Click's Option class:
@ -369,7 +354,7 @@ def init_cli(verbose_output=None):
if deprecated:
deprecation = Deprecation(deprecated)
self.help = deprecation.help(self.help)
self.help: str = deprecation.help(self.help)
if self.envvar:
self.help += ' The default value can be set with the %s environment variable.' % self.envvar
@ -377,16 +362,16 @@ def init_cli(verbose_output=None):
if self.scope.is_global:
self.help += ' This option can be used at most once either globally, or for one subcommand.'
def get_help_record(self, ctx):
def get_help_record(self, ctx: click.core.Context) -> Any:
# Backport "hidden" parameter to click 5.0
if self.hidden:
return
return None
return super(Option, self).get_help_record(ctx)
class CLI(click.MultiCommand):
"""Action list contains all actions with options available for CLI"""
def __init__(self, all_actions=None, verbose_output=None, help=None):
def __init__(self, all_actions: Dict=None, verbose_output: List=None, help: str=None) -> None:
super(CLI, self).__init__(
chain=True,
invoke_without_command=True,
@ -455,18 +440,21 @@ def init_cli(verbose_output=None):
self._actions[name].params.append(option)
def list_commands(self, ctx):
def list_commands(self, ctx: click.core.Context) -> List:
return sorted(filter(lambda name: not self._actions[name].hidden, self._actions))
def get_command(self, ctx, name):
def get_command(self, ctx: click.core.Context, name: str) -> Optional[Action]:
if name in self.commands_with_aliases:
return self._actions.get(self.commands_with_aliases.get(name))
# Trying fallback to build target (from "all" action) if command is not known
else:
return Action(name=name, callback=self._actions.get('fallback').unwrapped_callback)
callback = self._actions.get('fallback')
if callback:
return Action(name=name, callback=callback.unwrapped_callback)
return None
def _print_closing_message(self, args, actions):
def _print_closing_message(self, args: PropertyDict, actions: _OrderedDictKeysView) -> None:
# print a closing message of some kind
#
if any(t in str(actions) for t in ('flash', 'dfu', 'uf2', 'uf2-app')):
@ -479,11 +467,13 @@ def init_cli(verbose_output=None):
# Otherwise, if we built any binaries print a message about
# how to flash them
def print_flashing_message(title, key):
with open(os.path.join(args.build_dir, 'flasher_args.json')) as f:
flasher_args = json.load(f)
def print_flashing_message(title: str, key: str) -> None:
with open(os.path.join(args.build_dir, 'flasher_args.json')) as file:
flasher_args: Dict[str, Any] = json.load(file)
def flasher_path(f):
def flasher_path(f: Union[str, os.PathLike[str]]) -> str:
if type(args.build_dir) is bytes:
args.build_dir = args.build_dir.decode()
return _safe_relpath(os.path.join(args.build_dir, f))
if key != 'project': # flashing a single item
@ -536,11 +526,11 @@ def init_cli(verbose_output=None):
if 'bootloader' in actions:
print_flashing_message('Bootloader', 'bootloader')
def execute_tasks(self, tasks, **kwargs):
def execute_tasks(self, tasks: List, **kwargs: str) -> OrderedDict:
ctx = click.get_current_context()
global_args = PropertyDict(kwargs)
def _help_and_exit():
def _help_and_exit() -> None:
print(ctx.get_help())
ctx.exit()
@ -592,7 +582,7 @@ def init_cli(verbose_output=None):
_help_and_exit()
# Build full list of tasks to and deal with dependencies and order dependencies
tasks_to_run = OrderedDict()
tasks_to_run: OrderedDict = OrderedDict()
while tasks:
task = tasks[0]
tasks_dict = dict([(t.name, t) for t in tasks])
@ -661,13 +651,13 @@ def init_cli(verbose_output=None):
},
)
@click.option('-C', '--project-dir', default=os.getcwd(), type=click.Path())
def parse_project_dir(project_dir):
def parse_project_dir(project_dir: str) -> Any:
return realpath(project_dir)
# Set `complete_var` to not existing environment variable name to prevent early cmd completion
project_dir = parse_project_dir(standalone_mode=False, complete_var='_IDF.PY_COMPLETE_NOT_EXISTING')
all_actions = {}
all_actions: Dict = {}
# Load extensions from components dir
idf_py_extensions_path = os.path.join(os.environ['IDF_PATH'], 'tools', 'idf_py_actions')
extension_dirs = [realpath(idf_py_extensions_path)]
@ -730,12 +720,12 @@ def init_cli(verbose_output=None):
return CLI(help=cli_help, verbose_output=verbose_output, all_actions=all_actions)
def signal_handler(_signal, _frame):
def signal_handler(_signal: int, _frame: Optional[FrameType]) -> None:
# The Ctrl+C processed by other threads inside
pass
def main():
def main() -> None:
# Processing of Ctrl+C event for all threads made by main()
signal.signal(signal.SIGINT, signal_handler)
@ -753,7 +743,7 @@ def main():
cli(sys.argv[1:], prog_name=PROG, complete_var=SHELL_COMPLETE_VAR)
def _valid_unicode_config():
def _valid_unicode_config() -> Union[codecs.CodecInfo, bool]:
# Python 2 is always good
if sys.version_info[0] == 2:
return True
@ -765,15 +755,13 @@ def _valid_unicode_config():
return False
def _find_usable_locale():
def _find_usable_locale() -> str:
try:
locales = subprocess.Popen(['locale', '-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
locales = subprocess.Popen(['locale', '-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0].decode('ascii', 'replace')
except OSError:
locales = ''
if isinstance(locales, bytes):
locales = locales.decode('ascii', 'replace')
usable_locales = []
usable_locales: List[str] = []
for line in locales.splitlines():
locale = line.strip()
locale_name = locale.lower().replace('-', '')

View File

@ -7,20 +7,22 @@ import re
import shutil
import subprocess
import sys
from typing import Any, Dict, List, Optional
from urllib.error import URLError
from urllib.request import Request, urlopen
from webbrowser import open_new_tab
import click
from click.core import Context
from idf_py_actions.constants import GENERATORS, PREVIEW_TARGETS, SUPPORTED_TARGETS, URL_TO_DOC
from idf_py_actions.errors import FatalError
from idf_py_actions.global_options import global_options
from idf_py_actions.tools import (TargetChoice, ensure_build_directory, get_target, idf_version, merge_action_lists,
realpath, run_target)
from idf_py_actions.tools import (PropertyDict, TargetChoice, ensure_build_directory, get_target, idf_version,
merge_action_lists, realpath, run_target)
def action_extensions(base_actions, project_path):
def build_target(target_name, ctx, args):
def action_extensions(base_actions: Dict, project_path: str) -> Any:
def build_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
"""
Execute the target build system to build target 'target_name'
@ -30,7 +32,7 @@ def action_extensions(base_actions, project_path):
ensure_build_directory(args, ctx.info_name)
run_target(target_name, args)
def size_target(target_name, ctx, args):
def size_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
"""
Builds the app and then executes a size-related target passed in 'target_name'.
`tool_error_handler` handler is used to suppress errors during the build,
@ -38,18 +40,18 @@ def action_extensions(base_actions, project_path):
"""
def tool_error_handler(e):
def tool_error_handler(e: int) -> None:
pass
ensure_build_directory(args, ctx.info_name)
run_target('all', args, custom_error_handler=tool_error_handler)
run_target(target_name, args)
def list_build_system_targets(target_name, ctx, args):
def list_build_system_targets(target_name: str, ctx: Context, args: PropertyDict) -> None:
"""Shows list of targets known to build sytem (make/ninja)"""
build_target('help', ctx, args)
def menuconfig(target_name, ctx, args, style):
def menuconfig(target_name: str, ctx: Context, args: PropertyDict, style: str) -> None:
"""
Menuconfig target is build_target extended with the style argument for setting the value for the environment
variable.
@ -61,7 +63,7 @@ def action_extensions(base_actions, project_path):
os.environ['MENUCONFIG_STYLE'] = style
build_target(target_name, ctx, args)
def fallback_target(target_name, ctx, args):
def fallback_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
"""
Execute targets that are not explicitly known to idf.py
"""
@ -80,42 +82,22 @@ def action_extensions(base_actions, project_path):
run_target(target_name, args)
def verbose_callback(ctx, param, value):
def verbose_callback(ctx: Context, param: List, value: str) -> Optional[str]:
if not value or ctx.resilient_parsing:
return
return None
for line in ctx.command.verbose_output:
print(line)
return value
def clean(action, ctx, args):
def clean(action: str, ctx: Context, args: PropertyDict) -> None:
if not os.path.isdir(args.build_dir):
print("Build directory '%s' not found. Nothing to clean." % args.build_dir)
return
build_target('clean', ctx, args)
def _delete_windows_symlinks(directory):
"""
It deletes symlinks recursively on Windows. It is useful for Python 2 which doesn't detect symlinks on Windows.
"""
deleted_paths = []
if os.name == 'nt':
import ctypes
for root, dirnames, _filenames in os.walk(directory):
for d in dirnames:
full_path = os.path.join(root, d)
try:
full_path = full_path.decode('utf-8')
except Exception:
pass
if ctypes.windll.kernel32.GetFileAttributesW(full_path) & 0x0400:
os.rmdir(full_path)
deleted_paths.append(full_path)
return deleted_paths
def fullclean(action, ctx, args):
def fullclean(action: str, ctx: Context, args: PropertyDict) -> None:
build_dir = args.build_dir
if not os.path.isdir(build_dir):
print("Build directory '%s' not found. Nothing to clean." % build_dir)
@ -135,13 +117,8 @@ def action_extensions(base_actions, project_path):
raise FatalError(
"Refusing to automatically delete files in directory containing '%s'. Delete files manually if you're sure."
% red)
# OK, delete everything in the build directory...
# Note: Python 2.7 doesn't detect symlinks on Windows (it is supported form 3.2). Tools promising to not
# follow symlinks will actually follow them. Deleting the build directory with symlinks deletes also items
# outside of this directory.
deleted_symlinks = _delete_windows_symlinks(build_dir)
if args.verbose and len(deleted_symlinks) > 1:
print('The following symlinks were identified and removed:\n%s' % '\n'.join(deleted_symlinks))
if args.verbose and len(build_dir) > 1:
print('The following symlinks were identified and removed:\n%s' % '\n'.join(build_dir))
for f in os.listdir(build_dir): # TODO: once we are Python 3 only, this can be os.scandir()
f = os.path.join(build_dir, f)
if args.verbose:
@ -151,7 +128,7 @@ def action_extensions(base_actions, project_path):
else:
os.remove(f)
def python_clean(action, ctx, args):
def python_clean(action: str, ctx: Context, args: PropertyDict) -> None:
for root, dirnames, filenames in os.walk(os.environ['IDF_PATH']):
for d in dirnames:
if d == '__pycache__':
@ -165,7 +142,7 @@ def action_extensions(base_actions, project_path):
print('Removing: %s' % file_to_delete)
os.remove(file_to_delete)
def set_target(action, ctx, args, idf_target):
def set_target(action: str, ctx: Context, args: PropertyDict, idf_target: str) -> None:
if (not args['preview'] and idf_target in PREVIEW_TARGETS):
raise FatalError(
"%s is still in preview. You have to append '--preview' option after idf.py to use any preview feature."
@ -180,10 +157,10 @@ def action_extensions(base_actions, project_path):
print('Set Target to: %s, new sdkconfig created. Existing sdkconfig renamed to sdkconfig.old.' % idf_target)
ensure_build_directory(args, ctx.info_name, True)
def reconfigure(action, ctx, args):
def reconfigure(action: str, ctx: Context, args: PropertyDict) -> None:
ensure_build_directory(args, ctx.info_name, True)
def validate_root_options(ctx, args, tasks):
def validate_root_options(ctx: Context, args: PropertyDict, tasks: List) -> None:
args.project_dir = realpath(args.project_dir)
if args.build_dir is not None and args.project_dir == realpath(args.build_dir):
raise FatalError(
@ -193,7 +170,7 @@ def action_extensions(base_actions, project_path):
args.build_dir = os.path.join(args.project_dir, 'build')
args.build_dir = realpath(args.build_dir)
def idf_version_callback(ctx, param, value):
def idf_version_callback(ctx: Context, param: str, value: str) -> None:
if not value or ctx.resilient_parsing:
return
@ -205,7 +182,7 @@ def action_extensions(base_actions, project_path):
print('ESP-IDF %s' % version)
sys.exit(0)
def list_targets_callback(ctx, param, value):
def list_targets_callback(ctx: Context, param: List, value: int) -> None:
if not value or ctx.resilient_parsing:
return
@ -218,12 +195,13 @@ def action_extensions(base_actions, project_path):
sys.exit(0)
def show_docs(action, ctx, args, no_browser, language, starting_page, version, target):
def show_docs(action: str, ctx: Context, args: PropertyDict, no_browser: bool, language: str, starting_page: str, version: str, target: str) -> None:
if language == 'cn':
language = 'zh_CN'
if not version:
# '0.0-dev' here because if 'dev' in version it will transform in to 'latest'
version = re.search(r'v\d+\.\d+\.?\d*(-dev|-beta\d|-rc)?', idf_version() or '0.0-dev').group()
version_search = re.search(r'v\d+\.\d+\.?\d*(-dev|-beta\d|-rc)?', idf_version() or '0.0-dev')
version = version_search.group() if version_search else 'latest'
if 'dev' in version:
version = 'latest'
elif version[0] != 'v':
@ -249,7 +227,7 @@ def action_extensions(base_actions, project_path):
print(link)
sys.exit(0)
def get_default_language():
def get_default_language() -> str:
try:
language = 'zh_CN' if locale.getdefaultlocale()[0] == 'zh_CN' else 'en'
except ValueError:

View File

@ -6,13 +6,17 @@ import os
import re
import sys
from distutils.dir_util import copy_tree
from typing import Dict
import click
from idf_py_actions.tools import PropertyDict
def get_type(action):
def get_type(action: str) -> str:
return action.split('-')[1]
def replace_in_file(filename, pattern, replacement):
def replace_in_file(filename: str, pattern: str, replacement: str) -> None:
with open(filename, 'r+') as f:
content = f.read()
overwritten_content = re.sub(pattern, replacement, content, flags=re.M)
@ -21,7 +25,7 @@ def replace_in_file(filename, pattern, replacement):
f.truncate()
def is_empty_and_create(path, action):
def is_empty_and_create(path: str, action: str) -> None:
abspath = os.path.abspath(path)
if not os.path.exists(abspath):
os.makedirs(abspath)
@ -35,7 +39,7 @@ def is_empty_and_create(path, action):
sys.exit(3)
def create_project(target_path, name):
def create_project(target_path: str, name: str) -> None:
copy_tree(os.path.join(os.environ['IDF_PATH'], 'examples', 'get-started', 'sample_project'), target_path)
main_folder = os.path.join(target_path, 'main')
os.rename(os.path.join(main_folder, 'main.c'), os.path.join(main_folder, '.'.join((name, 'c'))))
@ -44,7 +48,7 @@ def create_project(target_path, name):
os.remove(os.path.join(target_path, 'README.md'))
def create_component(target_path, name):
def create_component(target_path: str, name: str) -> None:
copy_tree(os.path.join(os.environ['IDF_PATH'], 'tools', 'templates', 'sample_component'), target_path)
os.rename(os.path.join(target_path, 'main.c'), os.path.join(target_path, '.'.join((name, 'c'))))
os.rename(os.path.join(target_path, 'include', 'main.h'),
@ -54,8 +58,8 @@ def create_component(target_path, name):
replace_in_file(os.path.join(target_path, 'CMakeLists.txt'), 'main', name)
def action_extensions(base_actions, project_path):
def create_new(action, ctx, global_args, **action_args):
def action_extensions(base_actions: Dict, project_path: str) -> Dict:
def create_new(action: str, ctx: click.core.Context, global_args: PropertyDict, **action_args: str) -> Dict:
target_path = action_args.get('path') or os.path.join(project_path, action_args['name'])
is_empty_and_create(target_path, action)

View File

@ -9,21 +9,22 @@ import sys
import threading
import time
from threading import Thread
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from click.core import Context
from idf_py_actions.errors import FatalError
from idf_py_actions.tools import ensure_build_directory
from idf_py_actions.tools import PropertyDict, ensure_build_directory
PYTHON = sys.executable
def action_extensions(base_actions, project_path):
def action_extensions(base_actions: Dict, project_path: str) -> Dict:
OPENOCD_OUT_FILE = 'openocd_out.txt'
GDBGUI_OUT_FILE = 'gdbgui_out.txt'
# Internal dictionary of currently active processes, threads and their output files
processes = {'threads_to_join': [], 'openocd_issues': None}
processes: Dict = {'threads_to_join': [], 'openocd_issues': None}
def _check_for_common_openocd_issues(file_name, print_all=True):
def _check_for_common_openocd_issues(file_name: str, print_all: bool=True) -> Any:
if processes['openocd_issues'] is not None:
return processes['openocd_issues']
try:
@ -39,7 +40,7 @@ def action_extensions(base_actions, project_path):
processes['openocd_issues'] = message
return message
def _check_openocd_errors(fail_if_openocd_failed, target, ctx):
def _check_openocd_errors(fail_if_openocd_failed: Dict, target: str, ctx: Context) -> None:
if fail_if_openocd_failed:
if 'openocd' in processes and processes['openocd'] is not None:
p = processes['openocd']
@ -62,7 +63,7 @@ def action_extensions(base_actions, project_path):
# OpenOCD exited or error message detected -> print possible output and terminate
raise FatalError('Action "{}" failed due to errors in OpenOCD:\n{}'.format(target, _check_for_common_openocd_issues(name)), ctx)
def _terminate_async_target(target):
def _terminate_async_target(target: str) -> None:
if target in processes and processes[target] is not None:
try:
if target + '_outfile' in processes:
@ -86,11 +87,11 @@ def action_extensions(base_actions, project_path):
print('Failed to close/kill {}'.format(target))
processes[target] = None # to indicate this has ended
def is_gdb_with_python(gdb):
def is_gdb_with_python(gdb: str) -> bool:
# execute simple python command to check is it supported
return subprocess.run([gdb, '--batch-silent', '--ex', 'python import os'], stderr=subprocess.DEVNULL).returncode == 0
def create_local_gdbinit(gdb, gdbinit, elf_file):
def create_local_gdbinit(gdb: str, gdbinit: str, elf_file: str) -> None:
with open(gdbinit, 'w') as f:
if is_gdb_with_python(gdb):
f.write('python\n')
@ -107,7 +108,7 @@ def action_extensions(base_actions, project_path):
f.write('thb app_main\n')
f.write('c\n')
def debug_cleanup():
def debug_cleanup() -> None:
print('cleaning up debug targets')
for t in processes['threads_to_join']:
if threading.currentThread() != t:
@ -116,7 +117,7 @@ def action_extensions(base_actions, project_path):
_terminate_async_target('gdbgui')
_terminate_async_target('gdb')
def post_debug(action, ctx, args, **kwargs):
def post_debug(action: str, ctx: Context, args: PropertyDict, **kwargs: str) -> None:
""" Deal with asynchronous targets, such as openocd running in background """
if kwargs['block'] == 1:
for target in ['openocd', 'gdbgui']:
@ -143,7 +144,7 @@ def action_extensions(base_actions, project_path):
_terminate_async_target('openocd')
_terminate_async_target('gdbgui')
def get_project_desc(args, ctx):
def get_project_desc(args: PropertyDict, ctx: Context) -> Any:
desc_path = os.path.join(args.build_dir, 'project_description.json')
if not os.path.exists(desc_path):
ensure_build_directory(args, ctx.info_name)
@ -151,7 +152,7 @@ def action_extensions(base_actions, project_path):
project_desc = json.load(f)
return project_desc
def openocd(action, ctx, args, openocd_scripts, openocd_commands):
def openocd(action: str, ctx: Context, args: PropertyDict, openocd_scripts: Optional[str], openocd_commands: str) -> None:
"""
Execute openocd as external tool
"""
@ -188,14 +189,14 @@ def action_extensions(base_actions, project_path):
processes['openocd_outfile_name'] = openocd_out_name
print('OpenOCD started as a background task {}'.format(process.pid))
def get_gdb_args(gdbinit, project_desc: Dict[str, Any]) -> List[str]:
def get_gdb_args(gdbinit: str, project_desc: Dict[str, Any]) -> List:
args = ['-x={}'.format(gdbinit)]
debug_prefix_gdbinit = project_desc.get('debug_prefix_map_gdbinit')
if debug_prefix_gdbinit:
args.append('-ix={}'.format(debug_prefix_gdbinit))
return args
def gdbui(action, ctx, args, gdbgui_port, gdbinit, require_openocd):
def gdbui(action: str, ctx: Context, args: PropertyDict, gdbgui_port: Optional[str], gdbinit: Optional[str], require_openocd: bool) -> None:
"""
Asynchronous GDB-UI target
"""
@ -211,8 +212,8 @@ def action_extensions(base_actions, project_path):
# - '"-x=foo -x=bar"', would return ['foo bar']
# - '-x=foo', would return ['-x', 'foo'] and mess up the former option '--gdb-args'
# so for one item, use extra double quotes. for more items, use no extra double quotes.
gdb_args = get_gdb_args(gdbinit, project_desc)
gdb_args = '"{}"'.format(' '.join(gdb_args)) if len(gdb_args) == 1 else ' '.join(gdb_args)
gdb_args_list = get_gdb_args(gdbinit, project_desc)
gdb_args = '"{}"'.format(' '.join(gdb_args_list)) if len(gdb_args_list) == 1 else ' '.join(gdb_args_list)
args = ['gdbgui', '-g', gdb, '--gdb-args', gdb_args]
print(args)
@ -238,8 +239,8 @@ def action_extensions(base_actions, project_path):
print('gdbgui started as a background task {}'.format(process.pid))
_check_openocd_errors(fail_if_openocd_failed, action, ctx)
def global_callback(ctx, global_args, tasks):
def move_to_front(task_name):
def global_callback(ctx: Context, global_args: PropertyDict, tasks: List) -> None:
def move_to_front(task_name: str) -> None:
for index, task in enumerate(tasks):
if task.name == task_name:
tasks.insert(0, tasks.pop(index))
@ -264,18 +265,18 @@ def action_extensions(base_actions, project_path):
if task.name in ('gdb', 'gdbgui', 'gdbtui'):
task.action_args['require_openocd'] = True
def run_gdb(gdb_args):
def run_gdb(gdb_args: List) -> int:
p = subprocess.Popen(gdb_args)
processes['gdb'] = p
return p.wait()
def gdbtui(action, ctx, args, gdbinit, require_openocd):
def gdbtui(action: str, ctx: Context, args: PropertyDict, gdbinit: str, require_openocd: bool) -> None:
"""
Synchronous GDB target with text ui mode
"""
gdb(action, ctx, args, 1, gdbinit, require_openocd)
def gdb(action, ctx, args, gdb_tui, gdbinit, require_openocd):
def gdb(action: str, ctx: Context, args: PropertyDict, gdb_tui: Optional[int], gdbinit: Optional[str], require_openocd: bool) -> None:
"""
Synchronous GDB target
"""

View File

@ -1,22 +1,25 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from typing import Dict
from click.core import Context
from idf_py_actions.errors import FatalError
from idf_py_actions.tools import ensure_build_directory, is_target_supported, run_target
from idf_py_actions.tools import PropertyDict, ensure_build_directory, is_target_supported, run_target
def action_extensions(base_actions, project_path):
def action_extensions(base_actions: Dict, project_path: str) -> Dict:
SUPPORTED_TARGETS = ['esp32s2']
def dfu_target(target_name, ctx, args, part_size):
def dfu_target(target_name: str, ctx: Context, args: PropertyDict, part_size: str) -> None:
ensure_build_directory(args, ctx.info_name)
run_target(target_name, args, {'ESP_DFU_PART_SIZE': part_size} if part_size else {})
def dfu_list_target(target_name, ctx, args):
def dfu_list_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
ensure_build_directory(args, ctx.info_name)
run_target(target_name, args)
def dfu_flash_target(target_name, ctx, args, path):
def dfu_flash_target(target_name: str, ctx: Context, args: PropertyDict, path: str) -> None:
ensure_build_directory(args, ctx.info_name)
try:

View File

@ -1,11 +1,14 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from click.core import Context
class FatalError(RuntimeError):
"""
Wrapper class for runtime errors that aren't caused by bugs in idf.py or the build process.
"""
def __init__(self, message, ctx=None):
def __init__(self, message: str, ctx: Context=None):
super(RuntimeError, self).__init__(message)
# if context is defined, check for the cleanup tasks
if ctx is not None and 'cleanup' in ctx.meta:

View File

@ -4,18 +4,19 @@
import json
import os
import sys
from typing import Any, Dict, List
import click
from idf_monitor_base.output_helpers import yellow_print
from idf_py_actions.errors import FatalError, NoSerialPortFoundError
from idf_py_actions.global_options import global_options
from idf_py_actions.tools import ensure_build_directory, get_sdkconfig_value, run_target, run_tool
from idf_py_actions.tools import PropertyDict, ensure_build_directory, get_sdkconfig_value, run_target, run_tool
PYTHON = sys.executable
def action_extensions(base_actions, project_path):
def _get_project_desc(ctx, args):
def action_extensions(base_actions: Dict, project_path: str) -> Dict:
def _get_project_desc(ctx: click.core.Context, args: PropertyDict) -> Any:
desc_path = os.path.join(args.build_dir, 'project_description.json')
if not os.path.exists(desc_path):
ensure_build_directory(args, ctx.info_name)
@ -23,7 +24,7 @@ def action_extensions(base_actions, project_path):
project_desc = json.load(f)
return project_desc
def _get_default_serial_port(args):
def _get_default_serial_port(args: PropertyDict) -> Any:
# Import is done here in order to move it after the check_environment() ensured that pyserial has been installed
try:
import esptool
@ -45,7 +46,7 @@ def action_extensions(base_actions, project_path):
except Exception as e:
raise FatalError('An exception occurred during detection of the serial port: {}'.format(e))
def _get_esptool_args(args):
def _get_esptool_args(args: PropertyDict) -> List:
esptool_path = os.path.join(os.environ['IDF_PATH'], 'components/esptool_py/esptool/esptool.py')
esptool_wrapper_path = os.environ.get('ESPTOOL_WRAPPER', '')
if args.port is None:
@ -68,7 +69,7 @@ def action_extensions(base_actions, project_path):
result += ['--no-stub']
return result
def _get_commandline_options(ctx):
def _get_commandline_options(ctx: click.core.Context) -> List:
""" Return all the command line options up to first action """
# This approach ignores argument parsing done Click
result = []
@ -81,7 +82,8 @@ def action_extensions(base_actions, project_path):
return result
def monitor(action, ctx, args, print_filter, monitor_baud, encrypted, no_reset, timestamps, timestamp_format):
def monitor(action: str, ctx: click.core.Context, args: PropertyDict, print_filter: str, monitor_baud: str, encrypted: bool,
no_reset: bool, timestamps: bool, timestamp_format: str) -> None:
"""
Run idf_monitor.py to watch build output
"""
@ -152,7 +154,7 @@ def action_extensions(base_actions, project_path):
run_tool('idf_monitor', monitor_args, args.project_dir)
def flash(action, ctx, args):
def flash(action: str, ctx: click.core.Context, args: PropertyDict) -> None:
"""
Run esptool to flash the entire project, from an argfile generated by the build system
"""
@ -165,13 +167,13 @@ def action_extensions(base_actions, project_path):
esp_port = args.port or _get_default_serial_port(args)
run_target(action, args, {'ESPBAUD': str(args.baud), 'ESPPORT': esp_port})
def erase_flash(action, ctx, args):
def erase_flash(action: str, ctx: click.core.Context, args: PropertyDict) -> None:
ensure_build_directory(args, ctx.info_name)
esptool_args = _get_esptool_args(args)
esptool_args += ['erase_flash']
run_tool('esptool.py', esptool_args, args.build_dir)
def global_callback(ctx, global_args, tasks):
def global_callback(ctx: click.core.Context, global_args: Dict, tasks: PropertyDict) -> None:
encryption = any([task.name in ('encrypted-flash', 'encrypted-app-flash') for task in tasks])
if encryption:
for task in tasks:
@ -179,7 +181,7 @@ def action_extensions(base_actions, project_path):
task.action_args['encrypted'] = True
break
def ota_targets(target_name, ctx, args):
def ota_targets(target_name: str, ctx: click.core.Context, args: PropertyDict) -> None:
"""
Execute the target build system to build target 'target_name'.
Additionally set global variables for baud and port.

View File

@ -5,6 +5,7 @@ import re
import subprocess
import sys
from io import open
from typing import Any, List
import click
@ -340,12 +341,12 @@ class TargetChoice(click.Choice):
- ignores hyphens
- not case sensitive
"""
def __init__(self, choices):
def __init__(self, choices: List) -> None:
super(TargetChoice, self).__init__(choices, case_sensitive=False)
def convert(self, value, param, ctx):
def normalize(str):
return str.lower().replace('-', '')
def convert(self, value: Any, param: click.Parameter, ctx: click.Context) -> Any:
def normalize(string: str) -> str:
return string.lower().replace('-', '')
saved_token_normalize_func = ctx.token_normalize_func
ctx.token_normalize_func = normalize
@ -354,3 +355,20 @@ class TargetChoice(click.Choice):
return super(TargetChoice, self).convert(value, param, ctx)
finally:
ctx.token_normalize_func = saved_token_normalize_func
class PropertyDict(dict):
def __getattr__(self, name: str) -> Any:
if name in self:
return self[name]
else:
raise AttributeError("'PropertyDict' object has no attribute '%s'" % name)
def __setattr__(self, name: str, value: Any) -> None:
self[name] = value
def __delattr__(self, name: str) -> None:
if name in self:
del self[name]
else:
raise AttributeError("'PropertyDict' object has no attribute '%s'" % name)

View File

@ -1,10 +1,13 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from idf_py_actions.tools import ensure_build_directory, run_target
from typing import Dict, List
from click.core import Context
from idf_py_actions.tools import PropertyDict, ensure_build_directory, run_target
def action_extensions(base_actions, project_path):
def uf2_target(target_name, ctx, args):
def action_extensions(base_actions: Dict, project_path: List) -> Dict:
def uf2_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
ensure_build_directory(args, ctx.info_name)
run_target(target_name, args)

View File

@ -11,11 +11,12 @@ import json
import os
import struct
from functools import partial
from typing import Dict, List
from future.utils import iteritems
def round_up_int_div(n, d):
def round_up_int_div(n: int, d: int) -> int:
# equivalent to math.ceil(n / d)
return (n + d - 1) // d
@ -32,23 +33,23 @@ class UF2Writer(object):
UF2_FLAG_FAMILYID_PRESENT = 0x00002000
UF2_FLAG_MD5_PRESENT = 0x00004000
def __init__(self, chip_id, output_file, chunk_size):
def __init__(self, chip_id: int, output_file: os.PathLike, chunk_size: int) -> None:
self.chip_id = chip_id
self.CHUNK_SIZE = self.UF2_DATA_SIZE - self.UF2_MD5_PART_SIZE if chunk_size is None else chunk_size
self.f = open(output_file, 'wb')
def __enter__(self):
def __enter__(self) -> 'UF2Writer':
return self
def __exit__(self, exc_type, exc_val, exc_tb):
def __exit__(self, exc_type: str, exc_val: int, exc_tb: List) -> None:
if self.f:
self.f.close()
@staticmethod
def _to_uint32(num):
def _to_uint32(num: int) -> bytes:
return struct.pack('<I', num)
def _write_block(self, addr, chunk, len_chunk, block_no, blocks):
def _write_block(self, addr: int, chunk: bytes, len_chunk: int, block_no: int, blocks: int) -> None:
assert len_chunk > 0
assert len_chunk <= self.CHUNK_SIZE
assert block_no < blocks
@ -73,7 +74,7 @@ class UF2Writer(object):
assert len(block) == self.UF2_BLOCK_SIZE
self.f.write(block)
def add_file(self, addr, f_path):
def add_file(self, addr: int, f_path: os.PathLike) -> None:
blocks = round_up_int_div(os.path.getsize(f_path), self.CHUNK_SIZE)
with open(f_path, 'rb') as fin:
a = addr
@ -83,7 +84,7 @@ class UF2Writer(object):
a += len_chunk
def action_write(args):
def action_write(args: Dict) -> None:
with UF2Writer(args['chip_id'], args['output_file'], args['chunk_size']) as writer:
for addr, f in args['files']:
print('Adding {} at {:#x}'.format(f, addr))
@ -91,19 +92,19 @@ def action_write(args):
print('"{}" has been written.'.format(args['output_file']))
def main():
def main() -> None:
parser = argparse.ArgumentParser()
def four_byte_aligned(integer):
def four_byte_aligned(integer: int) -> bool:
return integer & 3 == 0
def parse_chunk_size(string):
def parse_chunk_size(string: str) -> int:
num = int(string, 0)
if not four_byte_aligned(num):
raise argparse.ArgumentTypeError('Chunk size should be a 4-byte aligned number')
return num
def parse_chip_id(string):
def parse_chip_id(string: str) -> int:
num = int(string, 16)
if num < 0 or num > 0xFFFFFFFF:
raise argparse.ArgumentTypeError('Chip ID should be a 4-byte unsigned integer')
@ -137,12 +138,12 @@ def main():
args = parser.parse_args()
def check_file(file_name):
def check_file(file_name: str) -> str:
if not os.path.isfile(file_name):
raise RuntimeError('{} is not a regular file!'.format(file_name))
return file_name
def parse_addr(string):
def parse_addr(string: str) -> int:
num = int(string, 0)
if not four_byte_aligned(num):
raise RuntimeError('{} is not a 4-byte aligned valid address'.format(string))
@ -155,7 +156,7 @@ def main():
if args.json:
json_dir = os.path.dirname(os.path.abspath(args.json))
def process_json_file(path):
def process_json_file(path: str) -> str:
'''
The input path is relative to json_dir. This function makes it relative to the current working
directory.