mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
tools: Add python types hints
This commit is contained in:
parent
b20aa0612b
commit
44f3c19fa9
@ -211,15 +211,7 @@ tools/find_apps.py
|
||||
tools/find_build_apps/common.py
|
||||
tools/gen_esp_err_to_name.py
|
||||
tools/gen_soc_caps_kconfig/test/test_gen_soc_caps_kconfig.py
|
||||
tools/idf.py
|
||||
tools/idf_py_actions/core_ext.py
|
||||
tools/idf_py_actions/create_ext.py
|
||||
tools/idf_py_actions/debug_ext.py
|
||||
tools/idf_py_actions/dfu_ext.py
|
||||
tools/idf_py_actions/errors.py
|
||||
tools/idf_py_actions/serial_ext.py
|
||||
tools/idf_py_actions/tools.py
|
||||
tools/idf_py_actions/uf2_ext.py
|
||||
tools/kconfig_new/confgen.py
|
||||
tools/kconfig_new/confserver.py
|
||||
tools/kconfig_new/gen_kconfig_doc.py
|
||||
@ -240,7 +232,6 @@ tools/ldgen/test/test_fragments.py
|
||||
tools/ldgen/test/test_generation.py
|
||||
tools/ldgen/test/test_output_commands.py
|
||||
tools/mass_mfg/mfg_gen.py
|
||||
tools/mkuf2.py
|
||||
tools/test_apps/build_system/ldgen_test/check_placements.py
|
||||
tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py
|
||||
tools/test_apps/protocols/openssl/app_test.py
|
||||
|
@ -6,6 +6,7 @@
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
||||
import sys
|
||||
from typing import Any, List, Optional, TextIO
|
||||
|
||||
try:
|
||||
from builtins import object, range, str
|
||||
@ -57,7 +58,7 @@ class ErrItem(object):
|
||||
- rel_str - (optional) error string which is a base for the error
|
||||
- rel_off - (optional) offset in relation to the base error
|
||||
"""
|
||||
def __init__(self, name, file, include_as=None, comment='', rel_str='', rel_off=0):
|
||||
def __init__(self, name: str, file: str, include_as: Optional[Any]=None, comment: str='', rel_str: str='', rel_off: int=0) -> None:
|
||||
self.name = name
|
||||
self.file = file
|
||||
self.include_as = include_as
|
||||
@ -65,7 +66,7 @@ class ErrItem(object):
|
||||
self.rel_str = rel_str
|
||||
self.rel_off = rel_off
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
ret = self.name + ' from ' + self.file
|
||||
if (self.rel_str != ''):
|
||||
ret += ' is (' + self.rel_str + ' + ' + str(self.rel_off) + ')'
|
||||
@ -73,7 +74,7 @@ class ErrItem(object):
|
||||
ret += ' // ' + self.comment
|
||||
return ret
|
||||
|
||||
def __cmp__(self, other):
|
||||
def __cmp__(self, other) -> int:
|
||||
if self.file in priority_headers and other.file not in priority_headers:
|
||||
return -1
|
||||
elif self.file not in priority_headers and other.file in priority_headers:
|
||||
@ -101,11 +102,11 @@ class InputError(RuntimeError):
|
||||
"""
|
||||
Represents and error on the input
|
||||
"""
|
||||
def __init__(self, p, e):
|
||||
def __init__(self, p: str, e: str) -> None:
|
||||
super(InputError, self).__init__(p + ': ' + e)
|
||||
|
||||
|
||||
def process(line, idf_path, include_as):
|
||||
def process(line: str, idf_path: str, include_as: Any) -> None:
|
||||
"""
|
||||
Process a line of text from file idf_path (relative to IDF project).
|
||||
Fills the global list unproc_list and dictionaries err_dict, rev_err_dict
|
||||
@ -168,7 +169,7 @@ def process(line, idf_path, include_as):
|
||||
unproc_list.append(ErrItem(words[1], idf_path, include_as, comment, related, num))
|
||||
|
||||
|
||||
def process_remaining_errors():
|
||||
def process_remaining_errors() -> None:
|
||||
"""
|
||||
Create errors which could not be processed before because the error code
|
||||
for the BASE error code wasn't known.
|
||||
@ -189,7 +190,7 @@ def process_remaining_errors():
|
||||
del unproc_list[:]
|
||||
|
||||
|
||||
def path_to_include(path):
|
||||
def path_to_include(path: str) -> str:
|
||||
"""
|
||||
Process the path (relative to the IDF project) in a form which can be used
|
||||
to include in a C file. Using just the filename does not work all the
|
||||
@ -210,7 +211,7 @@ def path_to_include(path):
|
||||
return os.sep.join(spl_path[i + 1:]) # subdirectories and filename in "include"
|
||||
|
||||
|
||||
def print_warning(error_list, error_code):
|
||||
def print_warning(error_list: List, error_code: int) -> None:
|
||||
"""
|
||||
Print warning about errors with the same error code
|
||||
"""
|
||||
@ -219,7 +220,7 @@ def print_warning(error_list, error_code):
|
||||
print(' ' + str(e))
|
||||
|
||||
|
||||
def max_string_width():
|
||||
def max_string_width() -> int:
|
||||
max = 0
|
||||
for k in err_dict:
|
||||
for e in err_dict[k]:
|
||||
@ -229,7 +230,7 @@ def max_string_width():
|
||||
return max
|
||||
|
||||
|
||||
def generate_c_output(fin, fout):
|
||||
def generate_c_output(fin: TextIO, fout: TextIO) -> None:
|
||||
"""
|
||||
Writes the output to fout based on th error dictionary err_dict and
|
||||
template file fin.
|
||||
@ -294,7 +295,7 @@ def generate_c_output(fin, fout):
|
||||
fout.write(line)
|
||||
|
||||
|
||||
def generate_rst_output(fout):
|
||||
def generate_rst_output(fout: TextIO) -> None:
|
||||
for k in sorted(err_dict.keys()):
|
||||
v = err_dict[k][0]
|
||||
fout.write(':c:macro:`{}` '.format(v.name))
|
||||
@ -307,7 +308,7 @@ def generate_rst_output(fout):
|
||||
fout.write('\n\n')
|
||||
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
if 'IDF_PATH' in os.environ:
|
||||
idf_path = os.environ['IDF_PATH']
|
||||
else:
|
||||
|
146
tools/idf.py
146
tools/idf.py
@ -13,7 +13,7 @@
|
||||
# check_environment() function below. If possible, avoid importing
|
||||
# any external libraries here - put in external script, or import in
|
||||
# their specific function instead.
|
||||
from __future__ import print_function
|
||||
from __future__ import annotations
|
||||
|
||||
import codecs
|
||||
import json
|
||||
@ -23,9 +23,11 @@ import os.path
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
from collections import Counter, OrderedDict
|
||||
from collections import Counter, OrderedDict, _OrderedDictKeysView
|
||||
from importlib import import_module
|
||||
from pkgutil import iter_modules
|
||||
from types import FrameType
|
||||
from typing import Any, Callable, Dict, List, Optional, TextIO, Union
|
||||
|
||||
# pyc files remain in the filesystem when switching between branches which might raise errors for incompatible
|
||||
# idf.py extensions. Therefore, pyc file generation is turned off:
|
||||
@ -35,7 +37,8 @@ import python_version_checker # noqa: E402
|
||||
|
||||
try:
|
||||
from idf_py_actions.errors import FatalError # noqa: E402
|
||||
from idf_py_actions.tools import executable_exists, idf_version, merge_action_lists, realpath # noqa: E402
|
||||
from idf_py_actions.tools import (PropertyDict, executable_exists, idf_version, merge_action_lists, # noqa: E402
|
||||
realpath)
|
||||
except ImportError:
|
||||
# For example, importing click could cause this.
|
||||
print('Please use idf.py only in an ESP-IDF shell environment.', file=sys.stderr)
|
||||
@ -61,12 +64,12 @@ SHELL_COMPLETE_RUN = SHELL_COMPLETE_VAR in os.environ
|
||||
|
||||
# function prints warning when autocompletion is not being performed
|
||||
# set argument stream to sys.stderr for errors and exceptions
|
||||
def print_warning(message, stream=None):
|
||||
def print_warning(message: str, stream: TextIO=None) -> None:
|
||||
if not SHELL_COMPLETE_RUN:
|
||||
print(message, file=stream or sys.stderr)
|
||||
|
||||
|
||||
def check_environment():
|
||||
def check_environment() -> List:
|
||||
"""
|
||||
Verify the environment contains the top-level tools we need to operate
|
||||
|
||||
@ -121,7 +124,7 @@ def check_environment():
|
||||
return checks_output
|
||||
|
||||
|
||||
def _safe_relpath(path, start=None):
|
||||
def _safe_relpath(path: str, start: Optional[str]=None) -> str:
|
||||
""" Return a relative path, same as os.path.relpath, but only if this is possible.
|
||||
|
||||
It is not possible on Windows, if the start directory and the path are on different drives.
|
||||
@ -132,7 +135,7 @@ def _safe_relpath(path, start=None):
|
||||
return os.path.abspath(path)
|
||||
|
||||
|
||||
def debug_print_idf_version():
|
||||
def debug_print_idf_version() -> None:
|
||||
version = idf_version()
|
||||
if version:
|
||||
print_warning('ESP-IDF %s' % version)
|
||||
@ -140,30 +143,13 @@ def debug_print_idf_version():
|
||||
print_warning('ESP-IDF version unknown')
|
||||
|
||||
|
||||
class PropertyDict(dict):
|
||||
def __getattr__(self, name):
|
||||
if name in self:
|
||||
return self[name]
|
||||
else:
|
||||
raise AttributeError("'PropertyDict' object has no attribute '%s'" % name)
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
self[name] = value
|
||||
|
||||
def __delattr__(self, name):
|
||||
if name in self:
|
||||
del self[name]
|
||||
else:
|
||||
raise AttributeError("'PropertyDict' object has no attribute '%s'" % name)
|
||||
|
||||
|
||||
def init_cli(verbose_output=None):
|
||||
def init_cli(verbose_output: List=None) -> Any:
|
||||
# Click is imported here to run it after check_environment()
|
||||
import click
|
||||
|
||||
class Deprecation(object):
|
||||
"""Construct deprecation notice for help messages"""
|
||||
def __init__(self, deprecated=False):
|
||||
def __init__(self, deprecated: Union[Dict, str, bool]=False) -> None:
|
||||
self.deprecated = deprecated
|
||||
self.since = None
|
||||
self.removed = None
|
||||
@ -178,7 +164,7 @@ def init_cli(verbose_output=None):
|
||||
elif isinstance(deprecated, str):
|
||||
self.custom_message = deprecated
|
||||
|
||||
def full_message(self, type='Option'):
|
||||
def full_message(self, type: str='Option') -> str:
|
||||
if self.exit_with_error:
|
||||
return '%s is deprecated %sand was removed%s.%s' % (
|
||||
type,
|
||||
@ -194,15 +180,15 @@ def init_cli(verbose_output=None):
|
||||
' %s' % self.custom_message if self.custom_message else '',
|
||||
)
|
||||
|
||||
def help(self, text, type='Option', separator=' '):
|
||||
def help(self, text: str, type: str='Option', separator: str=' ') -> str:
|
||||
text = text or ''
|
||||
return self.full_message(type) + separator + text if self.deprecated else text
|
||||
|
||||
def short_help(self, text):
|
||||
def short_help(self, text: str) -> str:
|
||||
text = text or ''
|
||||
return ('Deprecated! ' + text) if self.deprecated else text
|
||||
|
||||
def check_deprecation(ctx):
|
||||
def check_deprecation(ctx: click.core.Context) -> None:
|
||||
"""Prints deprecation warnings for arguments in given context"""
|
||||
for option in ctx.command.params:
|
||||
default = () if option.multiple else option.default
|
||||
@ -214,7 +200,8 @@ def init_cli(verbose_output=None):
|
||||
print_warning('Warning: %s' % deprecation.full_message('Option "%s"' % option.name))
|
||||
|
||||
class Task(object):
|
||||
def __init__(self, callback, name, aliases, dependencies, order_dependencies, action_args):
|
||||
def __init__(self, callback: Callable, name: str, aliases: List, dependencies: Optional[List],
|
||||
order_dependencies: Optional[List], action_args: Dict) -> None:
|
||||
self.callback = callback
|
||||
self.name = name
|
||||
self.dependencies = dependencies
|
||||
@ -222,7 +209,7 @@ def init_cli(verbose_output=None):
|
||||
self.action_args = action_args
|
||||
self.aliases = aliases
|
||||
|
||||
def __call__(self, context, global_args, action_args=None):
|
||||
def __call__(self, context: click.core.Context, global_args: PropertyDict, action_args: Dict=None) -> None:
|
||||
if action_args is None:
|
||||
action_args = self.action_args
|
||||
|
||||
@ -231,26 +218,24 @@ def init_cli(verbose_output=None):
|
||||
class Action(click.Command):
|
||||
def __init__(
|
||||
self,
|
||||
name=None,
|
||||
aliases=None,
|
||||
deprecated=False,
|
||||
dependencies=None,
|
||||
order_dependencies=None,
|
||||
hidden=False,
|
||||
**kwargs):
|
||||
name: Optional[str]=None,
|
||||
aliases: Optional[List]=None,
|
||||
deprecated: Union[Dict, str, bool]=False,
|
||||
dependencies: Optional[List]=None,
|
||||
order_dependencies: Optional[List]=None,
|
||||
hidden: bool=False,
|
||||
**kwargs: Any) -> None:
|
||||
super(Action, self).__init__(name, **kwargs)
|
||||
|
||||
self.name = self.name or self.callback.__name__
|
||||
self.deprecated = deprecated
|
||||
self.hidden = hidden
|
||||
self.name: str = self.name or self.callback.__name__
|
||||
self.deprecated: Union[Dict, str, bool] = deprecated
|
||||
self.hidden: bool = hidden
|
||||
|
||||
if aliases is None:
|
||||
aliases = []
|
||||
self.aliases = aliases
|
||||
|
||||
self.help = self.help or self.callback.__doc__
|
||||
if self.help is None:
|
||||
self.help = ''
|
||||
self.help: str = self.help or self.callback.__doc__ or ''
|
||||
|
||||
if dependencies is None:
|
||||
dependencies = []
|
||||
@ -259,7 +244,7 @@ def init_cli(verbose_output=None):
|
||||
order_dependencies = []
|
||||
|
||||
# Show first line of help if short help is missing
|
||||
self.short_help = self.short_help or self.help.split('\n')[0]
|
||||
self.short_help: str = self.short_help or self.help.split('\n')[0]
|
||||
|
||||
if deprecated:
|
||||
deprecation = Deprecation(deprecated)
|
||||
@ -276,7 +261,7 @@ def init_cli(verbose_output=None):
|
||||
self.unwrapped_callback = self.callback
|
||||
if self.callback is not None:
|
||||
|
||||
def wrapped_callback(**action_args):
|
||||
def wrapped_callback(**action_args: Any) -> Task:
|
||||
return Task(
|
||||
callback=self.unwrapped_callback,
|
||||
name=self.name,
|
||||
@ -288,7 +273,7 @@ def init_cli(verbose_output=None):
|
||||
|
||||
self.callback = wrapped_callback
|
||||
|
||||
def invoke(self, ctx):
|
||||
def invoke(self, ctx: click.core.Context) -> click.core.Context:
|
||||
if self.deprecated:
|
||||
deprecation = Deprecation(self.deprecated)
|
||||
message = deprecation.full_message('Command "%s"' % self.name)
|
||||
@ -310,7 +295,7 @@ def init_cli(verbose_output=None):
|
||||
|
||||
names - alias of 'param_decls'
|
||||
"""
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: str):
|
||||
names = kwargs.pop('names')
|
||||
super(Argument, self).__init__(names, **kwargs)
|
||||
|
||||
@ -325,7 +310,7 @@ def init_cli(verbose_output=None):
|
||||
|
||||
SCOPES = ('default', 'global', 'shared')
|
||||
|
||||
def __init__(self, scope=None):
|
||||
def __init__(self, scope: Union['Scope', str]=None) -> None:
|
||||
if scope is None:
|
||||
self._scope = 'default'
|
||||
elif isinstance(scope, str) and scope in self.SCOPES:
|
||||
@ -336,19 +321,19 @@ def init_cli(verbose_output=None):
|
||||
raise FatalError('Unknown scope for option: %s' % scope)
|
||||
|
||||
@property
|
||||
def is_global(self):
|
||||
def is_global(self) -> bool:
|
||||
return self._scope == 'global'
|
||||
|
||||
@property
|
||||
def is_shared(self):
|
||||
def is_shared(self) -> bool:
|
||||
return self._scope == 'shared'
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return self._scope
|
||||
|
||||
class Option(click.Option):
|
||||
"""Option that knows whether it should be global"""
|
||||
def __init__(self, scope=None, deprecated=False, hidden=False, **kwargs):
|
||||
def __init__(self, scope: Union[Scope, str]=None, deprecated: Union[Dict, str, bool]=False, hidden: bool=False, **kwargs: str) -> None:
|
||||
"""
|
||||
Keyword arguments additional to Click's Option class:
|
||||
|
||||
@ -369,7 +354,7 @@ def init_cli(verbose_output=None):
|
||||
|
||||
if deprecated:
|
||||
deprecation = Deprecation(deprecated)
|
||||
self.help = deprecation.help(self.help)
|
||||
self.help: str = deprecation.help(self.help)
|
||||
|
||||
if self.envvar:
|
||||
self.help += ' The default value can be set with the %s environment variable.' % self.envvar
|
||||
@ -377,16 +362,16 @@ def init_cli(verbose_output=None):
|
||||
if self.scope.is_global:
|
||||
self.help += ' This option can be used at most once either globally, or for one subcommand.'
|
||||
|
||||
def get_help_record(self, ctx):
|
||||
def get_help_record(self, ctx: click.core.Context) -> Any:
|
||||
# Backport "hidden" parameter to click 5.0
|
||||
if self.hidden:
|
||||
return
|
||||
return None
|
||||
|
||||
return super(Option, self).get_help_record(ctx)
|
||||
|
||||
class CLI(click.MultiCommand):
|
||||
"""Action list contains all actions with options available for CLI"""
|
||||
def __init__(self, all_actions=None, verbose_output=None, help=None):
|
||||
def __init__(self, all_actions: Dict=None, verbose_output: List=None, help: str=None) -> None:
|
||||
super(CLI, self).__init__(
|
||||
chain=True,
|
||||
invoke_without_command=True,
|
||||
@ -455,18 +440,21 @@ def init_cli(verbose_output=None):
|
||||
|
||||
self._actions[name].params.append(option)
|
||||
|
||||
def list_commands(self, ctx):
|
||||
def list_commands(self, ctx: click.core.Context) -> List:
|
||||
return sorted(filter(lambda name: not self._actions[name].hidden, self._actions))
|
||||
|
||||
def get_command(self, ctx, name):
|
||||
def get_command(self, ctx: click.core.Context, name: str) -> Optional[Action]:
|
||||
if name in self.commands_with_aliases:
|
||||
return self._actions.get(self.commands_with_aliases.get(name))
|
||||
|
||||
# Trying fallback to build target (from "all" action) if command is not known
|
||||
else:
|
||||
return Action(name=name, callback=self._actions.get('fallback').unwrapped_callback)
|
||||
callback = self._actions.get('fallback')
|
||||
if callback:
|
||||
return Action(name=name, callback=callback.unwrapped_callback)
|
||||
return None
|
||||
|
||||
def _print_closing_message(self, args, actions):
|
||||
def _print_closing_message(self, args: PropertyDict, actions: _OrderedDictKeysView) -> None:
|
||||
# print a closing message of some kind
|
||||
#
|
||||
if any(t in str(actions) for t in ('flash', 'dfu', 'uf2', 'uf2-app')):
|
||||
@ -479,11 +467,13 @@ def init_cli(verbose_output=None):
|
||||
|
||||
# Otherwise, if we built any binaries print a message about
|
||||
# how to flash them
|
||||
def print_flashing_message(title, key):
|
||||
with open(os.path.join(args.build_dir, 'flasher_args.json')) as f:
|
||||
flasher_args = json.load(f)
|
||||
def print_flashing_message(title: str, key: str) -> None:
|
||||
with open(os.path.join(args.build_dir, 'flasher_args.json')) as file:
|
||||
flasher_args: Dict[str, Any] = json.load(file)
|
||||
|
||||
def flasher_path(f):
|
||||
def flasher_path(f: Union[str, os.PathLike[str]]) -> str:
|
||||
if type(args.build_dir) is bytes:
|
||||
args.build_dir = args.build_dir.decode()
|
||||
return _safe_relpath(os.path.join(args.build_dir, f))
|
||||
|
||||
if key != 'project': # flashing a single item
|
||||
@ -536,11 +526,11 @@ def init_cli(verbose_output=None):
|
||||
if 'bootloader' in actions:
|
||||
print_flashing_message('Bootloader', 'bootloader')
|
||||
|
||||
def execute_tasks(self, tasks, **kwargs):
|
||||
def execute_tasks(self, tasks: List, **kwargs: str) -> OrderedDict:
|
||||
ctx = click.get_current_context()
|
||||
global_args = PropertyDict(kwargs)
|
||||
|
||||
def _help_and_exit():
|
||||
def _help_and_exit() -> None:
|
||||
print(ctx.get_help())
|
||||
ctx.exit()
|
||||
|
||||
@ -592,7 +582,7 @@ def init_cli(verbose_output=None):
|
||||
_help_and_exit()
|
||||
|
||||
# Build full list of tasks to and deal with dependencies and order dependencies
|
||||
tasks_to_run = OrderedDict()
|
||||
tasks_to_run: OrderedDict = OrderedDict()
|
||||
while tasks:
|
||||
task = tasks[0]
|
||||
tasks_dict = dict([(t.name, t) for t in tasks])
|
||||
@ -661,13 +651,13 @@ def init_cli(verbose_output=None):
|
||||
},
|
||||
)
|
||||
@click.option('-C', '--project-dir', default=os.getcwd(), type=click.Path())
|
||||
def parse_project_dir(project_dir):
|
||||
def parse_project_dir(project_dir: str) -> Any:
|
||||
return realpath(project_dir)
|
||||
|
||||
# Set `complete_var` to not existing environment variable name to prevent early cmd completion
|
||||
project_dir = parse_project_dir(standalone_mode=False, complete_var='_IDF.PY_COMPLETE_NOT_EXISTING')
|
||||
|
||||
all_actions = {}
|
||||
all_actions: Dict = {}
|
||||
# Load extensions from components dir
|
||||
idf_py_extensions_path = os.path.join(os.environ['IDF_PATH'], 'tools', 'idf_py_actions')
|
||||
extension_dirs = [realpath(idf_py_extensions_path)]
|
||||
@ -730,12 +720,12 @@ def init_cli(verbose_output=None):
|
||||
return CLI(help=cli_help, verbose_output=verbose_output, all_actions=all_actions)
|
||||
|
||||
|
||||
def signal_handler(_signal, _frame):
|
||||
def signal_handler(_signal: int, _frame: Optional[FrameType]) -> None:
|
||||
# The Ctrl+C processed by other threads inside
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
# Processing of Ctrl+C event for all threads made by main()
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
@ -753,7 +743,7 @@ def main():
|
||||
cli(sys.argv[1:], prog_name=PROG, complete_var=SHELL_COMPLETE_VAR)
|
||||
|
||||
|
||||
def _valid_unicode_config():
|
||||
def _valid_unicode_config() -> Union[codecs.CodecInfo, bool]:
|
||||
# Python 2 is always good
|
||||
if sys.version_info[0] == 2:
|
||||
return True
|
||||
@ -765,15 +755,13 @@ def _valid_unicode_config():
|
||||
return False
|
||||
|
||||
|
||||
def _find_usable_locale():
|
||||
def _find_usable_locale() -> str:
|
||||
try:
|
||||
locales = subprocess.Popen(['locale', '-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
|
||||
locales = subprocess.Popen(['locale', '-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0].decode('ascii', 'replace')
|
||||
except OSError:
|
||||
locales = ''
|
||||
if isinstance(locales, bytes):
|
||||
locales = locales.decode('ascii', 'replace')
|
||||
|
||||
usable_locales = []
|
||||
usable_locales: List[str] = []
|
||||
for line in locales.splitlines():
|
||||
locale = line.strip()
|
||||
locale_name = locale.lower().replace('-', '')
|
||||
|
@ -7,20 +7,22 @@ import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any, Dict, List, Optional
|
||||
from urllib.error import URLError
|
||||
from urllib.request import Request, urlopen
|
||||
from webbrowser import open_new_tab
|
||||
|
||||
import click
|
||||
from click.core import Context
|
||||
from idf_py_actions.constants import GENERATORS, PREVIEW_TARGETS, SUPPORTED_TARGETS, URL_TO_DOC
|
||||
from idf_py_actions.errors import FatalError
|
||||
from idf_py_actions.global_options import global_options
|
||||
from idf_py_actions.tools import (TargetChoice, ensure_build_directory, get_target, idf_version, merge_action_lists,
|
||||
realpath, run_target)
|
||||
from idf_py_actions.tools import (PropertyDict, TargetChoice, ensure_build_directory, get_target, idf_version,
|
||||
merge_action_lists, realpath, run_target)
|
||||
|
||||
|
||||
def action_extensions(base_actions, project_path):
|
||||
def build_target(target_name, ctx, args):
|
||||
def action_extensions(base_actions: Dict, project_path: str) -> Any:
|
||||
def build_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
|
||||
"""
|
||||
Execute the target build system to build target 'target_name'
|
||||
|
||||
@ -30,7 +32,7 @@ def action_extensions(base_actions, project_path):
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
run_target(target_name, args)
|
||||
|
||||
def size_target(target_name, ctx, args):
|
||||
def size_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
|
||||
"""
|
||||
Builds the app and then executes a size-related target passed in 'target_name'.
|
||||
`tool_error_handler` handler is used to suppress errors during the build,
|
||||
@ -38,18 +40,18 @@ def action_extensions(base_actions, project_path):
|
||||
|
||||
"""
|
||||
|
||||
def tool_error_handler(e):
|
||||
def tool_error_handler(e: int) -> None:
|
||||
pass
|
||||
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
run_target('all', args, custom_error_handler=tool_error_handler)
|
||||
run_target(target_name, args)
|
||||
|
||||
def list_build_system_targets(target_name, ctx, args):
|
||||
def list_build_system_targets(target_name: str, ctx: Context, args: PropertyDict) -> None:
|
||||
"""Shows list of targets known to build sytem (make/ninja)"""
|
||||
build_target('help', ctx, args)
|
||||
|
||||
def menuconfig(target_name, ctx, args, style):
|
||||
def menuconfig(target_name: str, ctx: Context, args: PropertyDict, style: str) -> None:
|
||||
"""
|
||||
Menuconfig target is build_target extended with the style argument for setting the value for the environment
|
||||
variable.
|
||||
@ -61,7 +63,7 @@ def action_extensions(base_actions, project_path):
|
||||
os.environ['MENUCONFIG_STYLE'] = style
|
||||
build_target(target_name, ctx, args)
|
||||
|
||||
def fallback_target(target_name, ctx, args):
|
||||
def fallback_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
|
||||
"""
|
||||
Execute targets that are not explicitly known to idf.py
|
||||
"""
|
||||
@ -80,42 +82,22 @@ def action_extensions(base_actions, project_path):
|
||||
|
||||
run_target(target_name, args)
|
||||
|
||||
def verbose_callback(ctx, param, value):
|
||||
def verbose_callback(ctx: Context, param: List, value: str) -> Optional[str]:
|
||||
if not value or ctx.resilient_parsing:
|
||||
return
|
||||
return None
|
||||
|
||||
for line in ctx.command.verbose_output:
|
||||
print(line)
|
||||
|
||||
return value
|
||||
|
||||
def clean(action, ctx, args):
|
||||
def clean(action: str, ctx: Context, args: PropertyDict) -> None:
|
||||
if not os.path.isdir(args.build_dir):
|
||||
print("Build directory '%s' not found. Nothing to clean." % args.build_dir)
|
||||
return
|
||||
build_target('clean', ctx, args)
|
||||
|
||||
def _delete_windows_symlinks(directory):
|
||||
"""
|
||||
It deletes symlinks recursively on Windows. It is useful for Python 2 which doesn't detect symlinks on Windows.
|
||||
"""
|
||||
deleted_paths = []
|
||||
if os.name == 'nt':
|
||||
import ctypes
|
||||
|
||||
for root, dirnames, _filenames in os.walk(directory):
|
||||
for d in dirnames:
|
||||
full_path = os.path.join(root, d)
|
||||
try:
|
||||
full_path = full_path.decode('utf-8')
|
||||
except Exception:
|
||||
pass
|
||||
if ctypes.windll.kernel32.GetFileAttributesW(full_path) & 0x0400:
|
||||
os.rmdir(full_path)
|
||||
deleted_paths.append(full_path)
|
||||
return deleted_paths
|
||||
|
||||
def fullclean(action, ctx, args):
|
||||
def fullclean(action: str, ctx: Context, args: PropertyDict) -> None:
|
||||
build_dir = args.build_dir
|
||||
if not os.path.isdir(build_dir):
|
||||
print("Build directory '%s' not found. Nothing to clean." % build_dir)
|
||||
@ -135,13 +117,8 @@ def action_extensions(base_actions, project_path):
|
||||
raise FatalError(
|
||||
"Refusing to automatically delete files in directory containing '%s'. Delete files manually if you're sure."
|
||||
% red)
|
||||
# OK, delete everything in the build directory...
|
||||
# Note: Python 2.7 doesn't detect symlinks on Windows (it is supported form 3.2). Tools promising to not
|
||||
# follow symlinks will actually follow them. Deleting the build directory with symlinks deletes also items
|
||||
# outside of this directory.
|
||||
deleted_symlinks = _delete_windows_symlinks(build_dir)
|
||||
if args.verbose and len(deleted_symlinks) > 1:
|
||||
print('The following symlinks were identified and removed:\n%s' % '\n'.join(deleted_symlinks))
|
||||
if args.verbose and len(build_dir) > 1:
|
||||
print('The following symlinks were identified and removed:\n%s' % '\n'.join(build_dir))
|
||||
for f in os.listdir(build_dir): # TODO: once we are Python 3 only, this can be os.scandir()
|
||||
f = os.path.join(build_dir, f)
|
||||
if args.verbose:
|
||||
@ -151,7 +128,7 @@ def action_extensions(base_actions, project_path):
|
||||
else:
|
||||
os.remove(f)
|
||||
|
||||
def python_clean(action, ctx, args):
|
||||
def python_clean(action: str, ctx: Context, args: PropertyDict) -> None:
|
||||
for root, dirnames, filenames in os.walk(os.environ['IDF_PATH']):
|
||||
for d in dirnames:
|
||||
if d == '__pycache__':
|
||||
@ -165,7 +142,7 @@ def action_extensions(base_actions, project_path):
|
||||
print('Removing: %s' % file_to_delete)
|
||||
os.remove(file_to_delete)
|
||||
|
||||
def set_target(action, ctx, args, idf_target):
|
||||
def set_target(action: str, ctx: Context, args: PropertyDict, idf_target: str) -> None:
|
||||
if (not args['preview'] and idf_target in PREVIEW_TARGETS):
|
||||
raise FatalError(
|
||||
"%s is still in preview. You have to append '--preview' option after idf.py to use any preview feature."
|
||||
@ -180,10 +157,10 @@ def action_extensions(base_actions, project_path):
|
||||
print('Set Target to: %s, new sdkconfig created. Existing sdkconfig renamed to sdkconfig.old.' % idf_target)
|
||||
ensure_build_directory(args, ctx.info_name, True)
|
||||
|
||||
def reconfigure(action, ctx, args):
|
||||
def reconfigure(action: str, ctx: Context, args: PropertyDict) -> None:
|
||||
ensure_build_directory(args, ctx.info_name, True)
|
||||
|
||||
def validate_root_options(ctx, args, tasks):
|
||||
def validate_root_options(ctx: Context, args: PropertyDict, tasks: List) -> None:
|
||||
args.project_dir = realpath(args.project_dir)
|
||||
if args.build_dir is not None and args.project_dir == realpath(args.build_dir):
|
||||
raise FatalError(
|
||||
@ -193,7 +170,7 @@ def action_extensions(base_actions, project_path):
|
||||
args.build_dir = os.path.join(args.project_dir, 'build')
|
||||
args.build_dir = realpath(args.build_dir)
|
||||
|
||||
def idf_version_callback(ctx, param, value):
|
||||
def idf_version_callback(ctx: Context, param: str, value: str) -> None:
|
||||
if not value or ctx.resilient_parsing:
|
||||
return
|
||||
|
||||
@ -205,7 +182,7 @@ def action_extensions(base_actions, project_path):
|
||||
print('ESP-IDF %s' % version)
|
||||
sys.exit(0)
|
||||
|
||||
def list_targets_callback(ctx, param, value):
|
||||
def list_targets_callback(ctx: Context, param: List, value: int) -> None:
|
||||
if not value or ctx.resilient_parsing:
|
||||
return
|
||||
|
||||
@ -218,12 +195,13 @@ def action_extensions(base_actions, project_path):
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
def show_docs(action, ctx, args, no_browser, language, starting_page, version, target):
|
||||
def show_docs(action: str, ctx: Context, args: PropertyDict, no_browser: bool, language: str, starting_page: str, version: str, target: str) -> None:
|
||||
if language == 'cn':
|
||||
language = 'zh_CN'
|
||||
if not version:
|
||||
# '0.0-dev' here because if 'dev' in version it will transform in to 'latest'
|
||||
version = re.search(r'v\d+\.\d+\.?\d*(-dev|-beta\d|-rc)?', idf_version() or '0.0-dev').group()
|
||||
version_search = re.search(r'v\d+\.\d+\.?\d*(-dev|-beta\d|-rc)?', idf_version() or '0.0-dev')
|
||||
version = version_search.group() if version_search else 'latest'
|
||||
if 'dev' in version:
|
||||
version = 'latest'
|
||||
elif version[0] != 'v':
|
||||
@ -249,7 +227,7 @@ def action_extensions(base_actions, project_path):
|
||||
print(link)
|
||||
sys.exit(0)
|
||||
|
||||
def get_default_language():
|
||||
def get_default_language() -> str:
|
||||
try:
|
||||
language = 'zh_CN' if locale.getdefaultlocale()[0] == 'zh_CN' else 'en'
|
||||
except ValueError:
|
||||
|
@ -6,13 +6,17 @@ import os
|
||||
import re
|
||||
import sys
|
||||
from distutils.dir_util import copy_tree
|
||||
from typing import Dict
|
||||
|
||||
import click
|
||||
from idf_py_actions.tools import PropertyDict
|
||||
|
||||
|
||||
def get_type(action):
|
||||
def get_type(action: str) -> str:
|
||||
return action.split('-')[1]
|
||||
|
||||
|
||||
def replace_in_file(filename, pattern, replacement):
|
||||
def replace_in_file(filename: str, pattern: str, replacement: str) -> None:
|
||||
with open(filename, 'r+') as f:
|
||||
content = f.read()
|
||||
overwritten_content = re.sub(pattern, replacement, content, flags=re.M)
|
||||
@ -21,7 +25,7 @@ def replace_in_file(filename, pattern, replacement):
|
||||
f.truncate()
|
||||
|
||||
|
||||
def is_empty_and_create(path, action):
|
||||
def is_empty_and_create(path: str, action: str) -> None:
|
||||
abspath = os.path.abspath(path)
|
||||
if not os.path.exists(abspath):
|
||||
os.makedirs(abspath)
|
||||
@ -35,7 +39,7 @@ def is_empty_and_create(path, action):
|
||||
sys.exit(3)
|
||||
|
||||
|
||||
def create_project(target_path, name):
|
||||
def create_project(target_path: str, name: str) -> None:
|
||||
copy_tree(os.path.join(os.environ['IDF_PATH'], 'examples', 'get-started', 'sample_project'), target_path)
|
||||
main_folder = os.path.join(target_path, 'main')
|
||||
os.rename(os.path.join(main_folder, 'main.c'), os.path.join(main_folder, '.'.join((name, 'c'))))
|
||||
@ -44,7 +48,7 @@ def create_project(target_path, name):
|
||||
os.remove(os.path.join(target_path, 'README.md'))
|
||||
|
||||
|
||||
def create_component(target_path, name):
|
||||
def create_component(target_path: str, name: str) -> None:
|
||||
copy_tree(os.path.join(os.environ['IDF_PATH'], 'tools', 'templates', 'sample_component'), target_path)
|
||||
os.rename(os.path.join(target_path, 'main.c'), os.path.join(target_path, '.'.join((name, 'c'))))
|
||||
os.rename(os.path.join(target_path, 'include', 'main.h'),
|
||||
@ -54,8 +58,8 @@ def create_component(target_path, name):
|
||||
replace_in_file(os.path.join(target_path, 'CMakeLists.txt'), 'main', name)
|
||||
|
||||
|
||||
def action_extensions(base_actions, project_path):
|
||||
def create_new(action, ctx, global_args, **action_args):
|
||||
def action_extensions(base_actions: Dict, project_path: str) -> Dict:
|
||||
def create_new(action: str, ctx: click.core.Context, global_args: PropertyDict, **action_args: str) -> Dict:
|
||||
target_path = action_args.get('path') or os.path.join(project_path, action_args['name'])
|
||||
|
||||
is_empty_and_create(target_path, action)
|
||||
|
@ -9,21 +9,22 @@ import sys
|
||||
import threading
|
||||
import time
|
||||
from threading import Thread
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from click.core import Context
|
||||
from idf_py_actions.errors import FatalError
|
||||
from idf_py_actions.tools import ensure_build_directory
|
||||
from idf_py_actions.tools import PropertyDict, ensure_build_directory
|
||||
|
||||
PYTHON = sys.executable
|
||||
|
||||
|
||||
def action_extensions(base_actions, project_path):
|
||||
def action_extensions(base_actions: Dict, project_path: str) -> Dict:
|
||||
OPENOCD_OUT_FILE = 'openocd_out.txt'
|
||||
GDBGUI_OUT_FILE = 'gdbgui_out.txt'
|
||||
# Internal dictionary of currently active processes, threads and their output files
|
||||
processes = {'threads_to_join': [], 'openocd_issues': None}
|
||||
processes: Dict = {'threads_to_join': [], 'openocd_issues': None}
|
||||
|
||||
def _check_for_common_openocd_issues(file_name, print_all=True):
|
||||
def _check_for_common_openocd_issues(file_name: str, print_all: bool=True) -> Any:
|
||||
if processes['openocd_issues'] is not None:
|
||||
return processes['openocd_issues']
|
||||
try:
|
||||
@ -39,7 +40,7 @@ def action_extensions(base_actions, project_path):
|
||||
processes['openocd_issues'] = message
|
||||
return message
|
||||
|
||||
def _check_openocd_errors(fail_if_openocd_failed, target, ctx):
|
||||
def _check_openocd_errors(fail_if_openocd_failed: Dict, target: str, ctx: Context) -> None:
|
||||
if fail_if_openocd_failed:
|
||||
if 'openocd' in processes and processes['openocd'] is not None:
|
||||
p = processes['openocd']
|
||||
@ -62,7 +63,7 @@ def action_extensions(base_actions, project_path):
|
||||
# OpenOCD exited or error message detected -> print possible output and terminate
|
||||
raise FatalError('Action "{}" failed due to errors in OpenOCD:\n{}'.format(target, _check_for_common_openocd_issues(name)), ctx)
|
||||
|
||||
def _terminate_async_target(target):
|
||||
def _terminate_async_target(target: str) -> None:
|
||||
if target in processes and processes[target] is not None:
|
||||
try:
|
||||
if target + '_outfile' in processes:
|
||||
@ -86,11 +87,11 @@ def action_extensions(base_actions, project_path):
|
||||
print('Failed to close/kill {}'.format(target))
|
||||
processes[target] = None # to indicate this has ended
|
||||
|
||||
def is_gdb_with_python(gdb):
|
||||
def is_gdb_with_python(gdb: str) -> bool:
|
||||
# execute simple python command to check is it supported
|
||||
return subprocess.run([gdb, '--batch-silent', '--ex', 'python import os'], stderr=subprocess.DEVNULL).returncode == 0
|
||||
|
||||
def create_local_gdbinit(gdb, gdbinit, elf_file):
|
||||
def create_local_gdbinit(gdb: str, gdbinit: str, elf_file: str) -> None:
|
||||
with open(gdbinit, 'w') as f:
|
||||
if is_gdb_with_python(gdb):
|
||||
f.write('python\n')
|
||||
@ -107,7 +108,7 @@ def action_extensions(base_actions, project_path):
|
||||
f.write('thb app_main\n')
|
||||
f.write('c\n')
|
||||
|
||||
def debug_cleanup():
|
||||
def debug_cleanup() -> None:
|
||||
print('cleaning up debug targets')
|
||||
for t in processes['threads_to_join']:
|
||||
if threading.currentThread() != t:
|
||||
@ -116,7 +117,7 @@ def action_extensions(base_actions, project_path):
|
||||
_terminate_async_target('gdbgui')
|
||||
_terminate_async_target('gdb')
|
||||
|
||||
def post_debug(action, ctx, args, **kwargs):
|
||||
def post_debug(action: str, ctx: Context, args: PropertyDict, **kwargs: str) -> None:
|
||||
""" Deal with asynchronous targets, such as openocd running in background """
|
||||
if kwargs['block'] == 1:
|
||||
for target in ['openocd', 'gdbgui']:
|
||||
@ -143,7 +144,7 @@ def action_extensions(base_actions, project_path):
|
||||
_terminate_async_target('openocd')
|
||||
_terminate_async_target('gdbgui')
|
||||
|
||||
def get_project_desc(args, ctx):
|
||||
def get_project_desc(args: PropertyDict, ctx: Context) -> Any:
|
||||
desc_path = os.path.join(args.build_dir, 'project_description.json')
|
||||
if not os.path.exists(desc_path):
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
@ -151,7 +152,7 @@ def action_extensions(base_actions, project_path):
|
||||
project_desc = json.load(f)
|
||||
return project_desc
|
||||
|
||||
def openocd(action, ctx, args, openocd_scripts, openocd_commands):
|
||||
def openocd(action: str, ctx: Context, args: PropertyDict, openocd_scripts: Optional[str], openocd_commands: str) -> None:
|
||||
"""
|
||||
Execute openocd as external tool
|
||||
"""
|
||||
@ -188,14 +189,14 @@ def action_extensions(base_actions, project_path):
|
||||
processes['openocd_outfile_name'] = openocd_out_name
|
||||
print('OpenOCD started as a background task {}'.format(process.pid))
|
||||
|
||||
def get_gdb_args(gdbinit, project_desc: Dict[str, Any]) -> List[str]:
|
||||
def get_gdb_args(gdbinit: str, project_desc: Dict[str, Any]) -> List:
|
||||
args = ['-x={}'.format(gdbinit)]
|
||||
debug_prefix_gdbinit = project_desc.get('debug_prefix_map_gdbinit')
|
||||
if debug_prefix_gdbinit:
|
||||
args.append('-ix={}'.format(debug_prefix_gdbinit))
|
||||
return args
|
||||
|
||||
def gdbui(action, ctx, args, gdbgui_port, gdbinit, require_openocd):
|
||||
def gdbui(action: str, ctx: Context, args: PropertyDict, gdbgui_port: Optional[str], gdbinit: Optional[str], require_openocd: bool) -> None:
|
||||
"""
|
||||
Asynchronous GDB-UI target
|
||||
"""
|
||||
@ -211,8 +212,8 @@ def action_extensions(base_actions, project_path):
|
||||
# - '"-x=foo -x=bar"', would return ['foo bar']
|
||||
# - '-x=foo', would return ['-x', 'foo'] and mess up the former option '--gdb-args'
|
||||
# so for one item, use extra double quotes. for more items, use no extra double quotes.
|
||||
gdb_args = get_gdb_args(gdbinit, project_desc)
|
||||
gdb_args = '"{}"'.format(' '.join(gdb_args)) if len(gdb_args) == 1 else ' '.join(gdb_args)
|
||||
gdb_args_list = get_gdb_args(gdbinit, project_desc)
|
||||
gdb_args = '"{}"'.format(' '.join(gdb_args_list)) if len(gdb_args_list) == 1 else ' '.join(gdb_args_list)
|
||||
args = ['gdbgui', '-g', gdb, '--gdb-args', gdb_args]
|
||||
print(args)
|
||||
|
||||
@ -238,8 +239,8 @@ def action_extensions(base_actions, project_path):
|
||||
print('gdbgui started as a background task {}'.format(process.pid))
|
||||
_check_openocd_errors(fail_if_openocd_failed, action, ctx)
|
||||
|
||||
def global_callback(ctx, global_args, tasks):
|
||||
def move_to_front(task_name):
|
||||
def global_callback(ctx: Context, global_args: PropertyDict, tasks: List) -> None:
|
||||
def move_to_front(task_name: str) -> None:
|
||||
for index, task in enumerate(tasks):
|
||||
if task.name == task_name:
|
||||
tasks.insert(0, tasks.pop(index))
|
||||
@ -264,18 +265,18 @@ def action_extensions(base_actions, project_path):
|
||||
if task.name in ('gdb', 'gdbgui', 'gdbtui'):
|
||||
task.action_args['require_openocd'] = True
|
||||
|
||||
def run_gdb(gdb_args):
|
||||
def run_gdb(gdb_args: List) -> int:
|
||||
p = subprocess.Popen(gdb_args)
|
||||
processes['gdb'] = p
|
||||
return p.wait()
|
||||
|
||||
def gdbtui(action, ctx, args, gdbinit, require_openocd):
|
||||
def gdbtui(action: str, ctx: Context, args: PropertyDict, gdbinit: str, require_openocd: bool) -> None:
|
||||
"""
|
||||
Synchronous GDB target with text ui mode
|
||||
"""
|
||||
gdb(action, ctx, args, 1, gdbinit, require_openocd)
|
||||
|
||||
def gdb(action, ctx, args, gdb_tui, gdbinit, require_openocd):
|
||||
def gdb(action: str, ctx: Context, args: PropertyDict, gdb_tui: Optional[int], gdbinit: Optional[str], require_openocd: bool) -> None:
|
||||
"""
|
||||
Synchronous GDB target
|
||||
"""
|
||||
|
@ -1,22 +1,25 @@
|
||||
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
from typing import Dict
|
||||
|
||||
from click.core import Context
|
||||
from idf_py_actions.errors import FatalError
|
||||
from idf_py_actions.tools import ensure_build_directory, is_target_supported, run_target
|
||||
from idf_py_actions.tools import PropertyDict, ensure_build_directory, is_target_supported, run_target
|
||||
|
||||
|
||||
def action_extensions(base_actions, project_path):
|
||||
def action_extensions(base_actions: Dict, project_path: str) -> Dict:
|
||||
|
||||
SUPPORTED_TARGETS = ['esp32s2']
|
||||
|
||||
def dfu_target(target_name, ctx, args, part_size):
|
||||
def dfu_target(target_name: str, ctx: Context, args: PropertyDict, part_size: str) -> None:
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
run_target(target_name, args, {'ESP_DFU_PART_SIZE': part_size} if part_size else {})
|
||||
|
||||
def dfu_list_target(target_name, ctx, args):
|
||||
def dfu_list_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
run_target(target_name, args)
|
||||
|
||||
def dfu_flash_target(target_name, ctx, args, path):
|
||||
def dfu_flash_target(target_name: str, ctx: Context, args: PropertyDict, path: str) -> None:
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
|
||||
try:
|
||||
|
@ -1,11 +1,14 @@
|
||||
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
from click.core import Context
|
||||
|
||||
|
||||
class FatalError(RuntimeError):
|
||||
"""
|
||||
Wrapper class for runtime errors that aren't caused by bugs in idf.py or the build process.
|
||||
"""
|
||||
|
||||
def __init__(self, message, ctx=None):
|
||||
def __init__(self, message: str, ctx: Context=None):
|
||||
super(RuntimeError, self).__init__(message)
|
||||
# if context is defined, check for the cleanup tasks
|
||||
if ctx is not None and 'cleanup' in ctx.meta:
|
||||
|
@ -4,18 +4,19 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import click
|
||||
from idf_monitor_base.output_helpers import yellow_print
|
||||
from idf_py_actions.errors import FatalError, NoSerialPortFoundError
|
||||
from idf_py_actions.global_options import global_options
|
||||
from idf_py_actions.tools import ensure_build_directory, get_sdkconfig_value, run_target, run_tool
|
||||
from idf_py_actions.tools import PropertyDict, ensure_build_directory, get_sdkconfig_value, run_target, run_tool
|
||||
|
||||
PYTHON = sys.executable
|
||||
|
||||
|
||||
def action_extensions(base_actions, project_path):
|
||||
def _get_project_desc(ctx, args):
|
||||
def action_extensions(base_actions: Dict, project_path: str) -> Dict:
|
||||
def _get_project_desc(ctx: click.core.Context, args: PropertyDict) -> Any:
|
||||
desc_path = os.path.join(args.build_dir, 'project_description.json')
|
||||
if not os.path.exists(desc_path):
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
@ -23,7 +24,7 @@ def action_extensions(base_actions, project_path):
|
||||
project_desc = json.load(f)
|
||||
return project_desc
|
||||
|
||||
def _get_default_serial_port(args):
|
||||
def _get_default_serial_port(args: PropertyDict) -> Any:
|
||||
# Import is done here in order to move it after the check_environment() ensured that pyserial has been installed
|
||||
try:
|
||||
import esptool
|
||||
@ -45,7 +46,7 @@ def action_extensions(base_actions, project_path):
|
||||
except Exception as e:
|
||||
raise FatalError('An exception occurred during detection of the serial port: {}'.format(e))
|
||||
|
||||
def _get_esptool_args(args):
|
||||
def _get_esptool_args(args: PropertyDict) -> List:
|
||||
esptool_path = os.path.join(os.environ['IDF_PATH'], 'components/esptool_py/esptool/esptool.py')
|
||||
esptool_wrapper_path = os.environ.get('ESPTOOL_WRAPPER', '')
|
||||
if args.port is None:
|
||||
@ -68,7 +69,7 @@ def action_extensions(base_actions, project_path):
|
||||
result += ['--no-stub']
|
||||
return result
|
||||
|
||||
def _get_commandline_options(ctx):
|
||||
def _get_commandline_options(ctx: click.core.Context) -> List:
|
||||
""" Return all the command line options up to first action """
|
||||
# This approach ignores argument parsing done Click
|
||||
result = []
|
||||
@ -81,7 +82,8 @@ def action_extensions(base_actions, project_path):
|
||||
|
||||
return result
|
||||
|
||||
def monitor(action, ctx, args, print_filter, monitor_baud, encrypted, no_reset, timestamps, timestamp_format):
|
||||
def monitor(action: str, ctx: click.core.Context, args: PropertyDict, print_filter: str, monitor_baud: str, encrypted: bool,
|
||||
no_reset: bool, timestamps: bool, timestamp_format: str) -> None:
|
||||
"""
|
||||
Run idf_monitor.py to watch build output
|
||||
"""
|
||||
@ -152,7 +154,7 @@ def action_extensions(base_actions, project_path):
|
||||
|
||||
run_tool('idf_monitor', monitor_args, args.project_dir)
|
||||
|
||||
def flash(action, ctx, args):
|
||||
def flash(action: str, ctx: click.core.Context, args: PropertyDict) -> None:
|
||||
"""
|
||||
Run esptool to flash the entire project, from an argfile generated by the build system
|
||||
"""
|
||||
@ -165,13 +167,13 @@ def action_extensions(base_actions, project_path):
|
||||
esp_port = args.port or _get_default_serial_port(args)
|
||||
run_target(action, args, {'ESPBAUD': str(args.baud), 'ESPPORT': esp_port})
|
||||
|
||||
def erase_flash(action, ctx, args):
|
||||
def erase_flash(action: str, ctx: click.core.Context, args: PropertyDict) -> None:
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
esptool_args = _get_esptool_args(args)
|
||||
esptool_args += ['erase_flash']
|
||||
run_tool('esptool.py', esptool_args, args.build_dir)
|
||||
|
||||
def global_callback(ctx, global_args, tasks):
|
||||
def global_callback(ctx: click.core.Context, global_args: Dict, tasks: PropertyDict) -> None:
|
||||
encryption = any([task.name in ('encrypted-flash', 'encrypted-app-flash') for task in tasks])
|
||||
if encryption:
|
||||
for task in tasks:
|
||||
@ -179,7 +181,7 @@ def action_extensions(base_actions, project_path):
|
||||
task.action_args['encrypted'] = True
|
||||
break
|
||||
|
||||
def ota_targets(target_name, ctx, args):
|
||||
def ota_targets(target_name: str, ctx: click.core.Context, args: PropertyDict) -> None:
|
||||
"""
|
||||
Execute the target build system to build target 'target_name'.
|
||||
Additionally set global variables for baud and port.
|
||||
|
@ -5,6 +5,7 @@ import re
|
||||
import subprocess
|
||||
import sys
|
||||
from io import open
|
||||
from typing import Any, List
|
||||
|
||||
import click
|
||||
|
||||
@ -340,12 +341,12 @@ class TargetChoice(click.Choice):
|
||||
- ignores hyphens
|
||||
- not case sensitive
|
||||
"""
|
||||
def __init__(self, choices):
|
||||
def __init__(self, choices: List) -> None:
|
||||
super(TargetChoice, self).__init__(choices, case_sensitive=False)
|
||||
|
||||
def convert(self, value, param, ctx):
|
||||
def normalize(str):
|
||||
return str.lower().replace('-', '')
|
||||
def convert(self, value: Any, param: click.Parameter, ctx: click.Context) -> Any:
|
||||
def normalize(string: str) -> str:
|
||||
return string.lower().replace('-', '')
|
||||
|
||||
saved_token_normalize_func = ctx.token_normalize_func
|
||||
ctx.token_normalize_func = normalize
|
||||
@ -354,3 +355,20 @@ class TargetChoice(click.Choice):
|
||||
return super(TargetChoice, self).convert(value, param, ctx)
|
||||
finally:
|
||||
ctx.token_normalize_func = saved_token_normalize_func
|
||||
|
||||
|
||||
class PropertyDict(dict):
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
if name in self:
|
||||
return self[name]
|
||||
else:
|
||||
raise AttributeError("'PropertyDict' object has no attribute '%s'" % name)
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None:
|
||||
self[name] = value
|
||||
|
||||
def __delattr__(self, name: str) -> None:
|
||||
if name in self:
|
||||
del self[name]
|
||||
else:
|
||||
raise AttributeError("'PropertyDict' object has no attribute '%s'" % name)
|
||||
|
@ -1,10 +1,13 @@
|
||||
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
from idf_py_actions.tools import ensure_build_directory, run_target
|
||||
from typing import Dict, List
|
||||
|
||||
from click.core import Context
|
||||
from idf_py_actions.tools import PropertyDict, ensure_build_directory, run_target
|
||||
|
||||
|
||||
def action_extensions(base_actions, project_path):
|
||||
def uf2_target(target_name, ctx, args):
|
||||
def action_extensions(base_actions: Dict, project_path: List) -> Dict:
|
||||
def uf2_target(target_name: str, ctx: Context, args: PropertyDict) -> None:
|
||||
ensure_build_directory(args, ctx.info_name)
|
||||
run_target(target_name, args)
|
||||
|
||||
|
@ -11,11 +11,12 @@ import json
|
||||
import os
|
||||
import struct
|
||||
from functools import partial
|
||||
from typing import Dict, List
|
||||
|
||||
from future.utils import iteritems
|
||||
|
||||
|
||||
def round_up_int_div(n, d):
|
||||
def round_up_int_div(n: int, d: int) -> int:
|
||||
# equivalent to math.ceil(n / d)
|
||||
return (n + d - 1) // d
|
||||
|
||||
@ -32,23 +33,23 @@ class UF2Writer(object):
|
||||
UF2_FLAG_FAMILYID_PRESENT = 0x00002000
|
||||
UF2_FLAG_MD5_PRESENT = 0x00004000
|
||||
|
||||
def __init__(self, chip_id, output_file, chunk_size):
|
||||
def __init__(self, chip_id: int, output_file: os.PathLike, chunk_size: int) -> None:
|
||||
self.chip_id = chip_id
|
||||
self.CHUNK_SIZE = self.UF2_DATA_SIZE - self.UF2_MD5_PART_SIZE if chunk_size is None else chunk_size
|
||||
self.f = open(output_file, 'wb')
|
||||
|
||||
def __enter__(self):
|
||||
def __enter__(self) -> 'UF2Writer':
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
def __exit__(self, exc_type: str, exc_val: int, exc_tb: List) -> None:
|
||||
if self.f:
|
||||
self.f.close()
|
||||
|
||||
@staticmethod
|
||||
def _to_uint32(num):
|
||||
def _to_uint32(num: int) -> bytes:
|
||||
return struct.pack('<I', num)
|
||||
|
||||
def _write_block(self, addr, chunk, len_chunk, block_no, blocks):
|
||||
def _write_block(self, addr: int, chunk: bytes, len_chunk: int, block_no: int, blocks: int) -> None:
|
||||
assert len_chunk > 0
|
||||
assert len_chunk <= self.CHUNK_SIZE
|
||||
assert block_no < blocks
|
||||
@ -73,7 +74,7 @@ class UF2Writer(object):
|
||||
assert len(block) == self.UF2_BLOCK_SIZE
|
||||
self.f.write(block)
|
||||
|
||||
def add_file(self, addr, f_path):
|
||||
def add_file(self, addr: int, f_path: os.PathLike) -> None:
|
||||
blocks = round_up_int_div(os.path.getsize(f_path), self.CHUNK_SIZE)
|
||||
with open(f_path, 'rb') as fin:
|
||||
a = addr
|
||||
@ -83,7 +84,7 @@ class UF2Writer(object):
|
||||
a += len_chunk
|
||||
|
||||
|
||||
def action_write(args):
|
||||
def action_write(args: Dict) -> None:
|
||||
with UF2Writer(args['chip_id'], args['output_file'], args['chunk_size']) as writer:
|
||||
for addr, f in args['files']:
|
||||
print('Adding {} at {:#x}'.format(f, addr))
|
||||
@ -91,19 +92,19 @@ def action_write(args):
|
||||
print('"{}" has been written.'.format(args['output_file']))
|
||||
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
def four_byte_aligned(integer):
|
||||
def four_byte_aligned(integer: int) -> bool:
|
||||
return integer & 3 == 0
|
||||
|
||||
def parse_chunk_size(string):
|
||||
def parse_chunk_size(string: str) -> int:
|
||||
num = int(string, 0)
|
||||
if not four_byte_aligned(num):
|
||||
raise argparse.ArgumentTypeError('Chunk size should be a 4-byte aligned number')
|
||||
return num
|
||||
|
||||
def parse_chip_id(string):
|
||||
def parse_chip_id(string: str) -> int:
|
||||
num = int(string, 16)
|
||||
if num < 0 or num > 0xFFFFFFFF:
|
||||
raise argparse.ArgumentTypeError('Chip ID should be a 4-byte unsigned integer')
|
||||
@ -137,12 +138,12 @@ def main():
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
def check_file(file_name):
|
||||
def check_file(file_name: str) -> str:
|
||||
if not os.path.isfile(file_name):
|
||||
raise RuntimeError('{} is not a regular file!'.format(file_name))
|
||||
return file_name
|
||||
|
||||
def parse_addr(string):
|
||||
def parse_addr(string: str) -> int:
|
||||
num = int(string, 0)
|
||||
if not four_byte_aligned(num):
|
||||
raise RuntimeError('{} is not a 4-byte aligned valid address'.format(string))
|
||||
@ -155,7 +156,7 @@ def main():
|
||||
if args.json:
|
||||
json_dir = os.path.dirname(os.path.abspath(args.json))
|
||||
|
||||
def process_json_file(path):
|
||||
def process_json_file(path: str) -> str:
|
||||
'''
|
||||
The input path is relative to json_dir. This function makes it relative to the current working
|
||||
directory.
|
||||
|
Loading…
Reference in New Issue
Block a user