Tools: extracted functionality out of idf_monitor

This commit is contained in:
Martin Gaňo 2021-05-14 11:20:38 +02:00 committed by bot
parent b5f9149399
commit 2452dc57f0
13 changed files with 555 additions and 456 deletions

View File

@ -32,43 +32,19 @@
from __future__ import division, print_function, unicode_literals from __future__ import division, print_function, unicode_literals
import argparse
import codecs import codecs
import os import os
import re import re
import subprocess
import threading import threading
import time import time
from builtins import bytes, object from builtins import bytes, object
try: try:
from typing import List, Optional, Union # noqa from typing import Any, List, Optional, Union # noqa
except ImportError: except ImportError:
pass pass
from idf_monitor_base.chip_specific_config import get_chip_config import queue
from idf_monitor_base.console_parser import ConsoleParser, prompt_next_action
from idf_monitor_base.console_reader import ConsoleReader
from idf_monitor_base.constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET,
CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_H,
DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, MATCH_PCADDR,
PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE,
PANIC_READING, PANIC_STACK_DUMP, PANIC_START, TAG_CMD, TAG_KEY, TAG_SERIAL,
TAG_SERIAL_FLUSH)
from idf_monitor_base.coredump import COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO, CoreDump
from idf_monitor_base.exceptions import SerialStopException
from idf_monitor_base.gdbhelper import GDBHelper
from idf_monitor_base.line_matcher import LineMatcher
from idf_monitor_base.output_helpers import Logger, lookup_pc_address, normal_print, yellow_print
from idf_monitor_base.serial_reader import SerialReader
from idf_monitor_base.web_socket_client import WebSocketClient
from serial.tools import miniterm
try:
import queue # noqa
except ImportError:
import Queue as queue # type: ignore # noqa
import shlex import shlex
import sys import sys
@ -76,6 +52,24 @@ import serial
import serial.tools.list_ports import serial.tools.list_ports
# Windows console stuff # Windows console stuff
from idf_monitor_base.ansi_color_converter import get_converter from idf_monitor_base.ansi_color_converter import get_converter
from idf_monitor_base.argument_parser import get_parser
from idf_monitor_base.console_parser import ConsoleParser
from idf_monitor_base.console_reader import ConsoleReader
from idf_monitor_base.constants import (CTRL_C, CTRL_H, DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX,
ESPPORT_ENVIRON, EVENT_QUEUE_TIMEOUT, GDB_EXIT_TIMEOUT,
GDB_UART_CONTINUE_COMMAND, LAST_LINE_THREAD_INTERVAL, MAKEFLAGS_ENVIRON,
PANIC_DECODE_DISABLE, PANIC_IDLE, TAG_CMD, TAG_KEY, TAG_SERIAL,
TAG_SERIAL_FLUSH)
from idf_monitor_base.coredump import COREDUMP_DECODE_INFO, CoreDump
from idf_monitor_base.exceptions import SerialStopException
from idf_monitor_base.gdbhelper import GDBHelper
from idf_monitor_base.line_matcher import LineMatcher
from idf_monitor_base.logger import Logger
from idf_monitor_base.output_helpers import normal_print, yellow_print
from idf_monitor_base.serial_handler import SerialHandler, run_make
from idf_monitor_base.serial_reader import SerialReader
from idf_monitor_base.web_socket_client import WebSocketClient
from serial.tools import miniterm
key_description = miniterm.key_description key_description = miniterm.key_description
@ -111,14 +105,13 @@ class Monitor(object):
self.event_queue = queue.Queue() # type: queue.Queue self.event_queue = queue.Queue() # type: queue.Queue
self.cmd_queue = queue.Queue() # type: queue.Queue self.cmd_queue = queue.Queue() # type: queue.Queue
self.console = miniterm.Console() self.console = miniterm.Console()
self.enable_address_decoding = enable_address_decoding
sys.stderr = get_converter(sys.stderr, decode_output=True) sys.stderr = get_converter(sys.stderr, decode_output=True)
self.console.output = get_converter(self.console.output) self.console.output = get_converter(self.console.output)
self.console.byte_output = get_converter(self.console.byte_output) self.console.byte_output = get_converter(self.console.byte_output)
socket_mode = serial_instance.port.startswith( # testing hook - data from serial can make exit the monitor
'socket://') # testing hook - data from serial can make exit the monitor socket_mode = serial_instance.port.startswith('socket://')
self.serial = serial_instance self.serial = serial_instance
self.console_parser = ConsoleParser(eol) self.console_parser = ConsoleParser(eol)
@ -126,32 +119,22 @@ class Monitor(object):
socket_mode) socket_mode)
self.serial_reader = SerialReader(self.serial, self.event_queue) self.serial_reader = SerialReader(self.serial, self.event_queue)
self.elf_file = elf_file self.elf_file = elf_file
if not os.path.exists(make):
# allow for possibility the "make" arg is a list of arguments (for idf.py) # allow for possibility the "make" arg is a list of arguments (for idf.py)
self.make = shlex.split(make) # type: Union[str, List[str]] self.make = make if os.path.exists(make) else shlex.split(make) # type: Any[Union[str, List[str]], str]
else:
self.make = make
self.encrypted = encrypted
self.toolchain_prefix = toolchain_prefix
self.websocket_client = websocket_client
self.target = target self.target = target
# internal state
self._last_line_part = b''
self._pc_address_buffer = b''
self._line_matcher = LineMatcher(print_filter) self._line_matcher = LineMatcher(print_filter)
self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer] self.gdb_helper = GDBHelper(toolchain_prefix, websocket_client, self.elf_file, self.serial.port,
self._force_line_print = False
self._serial_check_exit = socket_mode
self._decode_panic = decode_panic
self._reading_panic = PANIC_IDLE
self._panic_buffer = b''
self.start_cmd_sent = False
self.gdb_helper = GDBHelper(self.toolchain_prefix, self.websocket_client, self.elf_file, self.serial.port,
self.serial.baudrate) self.serial.baudrate)
self.logger = Logger(self.elf_file, self.console, timestamps, timestamp_format, b'', enable_address_decoding,
toolchain_prefix)
self.coredump = CoreDump(decode_coredumps, self.event_queue, self.logger, websocket_client, self.elf_file)
self.serial_handler = SerialHandler(b'', socket_mode, self.logger, decode_panic, PANIC_IDLE, b'', target,
False, False, self.serial, encrypted)
self.logger = Logger(self.elf_file, self.console, timestamps, timestamp_format) # internal state
self.coredump = CoreDump(decode_coredumps, self.event_queue, self.logger, self.websocket_client, self.elf_file) self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer]
def invoke_processing_last_line(self): def invoke_processing_last_line(self):
# type: () -> None # type: () -> None
@ -162,18 +145,17 @@ class Monitor(object):
self.console_reader.start() self.console_reader.start()
self.serial_reader.start() self.serial_reader.start()
self.gdb_helper.gdb_exit = False self.gdb_helper.gdb_exit = False
self.start_cmd_sent = False self.serial_handler.start_cmd_sent = False
try: try:
while self.console_reader.alive and self.serial_reader.alive: while self.console_reader.alive and self.serial_reader.alive:
try: try:
if self.gdb_helper.gdb_exit: if self.gdb_helper.gdb_exit:
self.gdb_helper.gdb_exit = False self.gdb_helper.gdb_exit = False
time.sleep(GDB_EXIT_TIMEOUT)
time.sleep(0.3)
try: try:
# Continue the program after exit from the GDB # Continue the program after exit from the GDB
self.serial.write(codecs.encode('+$c#63')) self.serial.write(codecs.encode(GDB_UART_CONTINUE_COMMAND))
self.start_cmd_sent = True self.serial_handler.start_cmd_sent = True
except serial.SerialException: except serial.SerialException:
pass # this shouldn't happen, but sometimes port has closed in serial thread pass # this shouldn't happen, but sometimes port has closed in serial thread
except UnicodeEncodeError: except UnicodeEncodeError:
@ -183,14 +165,14 @@ class Monitor(object):
item = self.cmd_queue.get_nowait() item = self.cmd_queue.get_nowait()
except queue.Empty: except queue.Empty:
try: try:
item = self.event_queue.get(True, 0.03) item = self.event_queue.get(timeout=EVENT_QUEUE_TIMEOUT)
except queue.Empty: except queue.Empty:
continue continue
(event_tag, data) = item event_tag, data = item
if event_tag == TAG_CMD: if event_tag == TAG_CMD:
self.handle_commands(data, self.target) self.serial_handler.handle_commands(data, self.target, self.run_make, self.console_reader,
self.serial_reader)
elif event_tag == TAG_KEY: elif event_tag == TAG_KEY:
try: try:
self.serial.write(codecs.encode(data)) self.serial.write(codecs.encode(data))
@ -199,10 +181,12 @@ class Monitor(object):
except UnicodeEncodeError: except UnicodeEncodeError:
pass # this can happen if a non-ascii character was passed, ignoring pass # this can happen if a non-ascii character was passed, ignoring
elif event_tag == TAG_SERIAL: elif event_tag == TAG_SERIAL:
self.handle_serial_input(data) self.serial_handler.handle_serial_input(data, self.console_parser, self.coredump,
self.gdb_helper, self._line_matcher,
self.check_gdb_stub_and_run)
if self._invoke_processing_last_line_timer is not None: if self._invoke_processing_last_line_timer is not None:
self._invoke_processing_last_line_timer.cancel() self._invoke_processing_last_line_timer.cancel()
self._invoke_processing_last_line_timer = threading.Timer(0.1, self._invoke_processing_last_line_timer = threading.Timer(LAST_LINE_THREAD_INTERVAL,
self.invoke_processing_last_line) self.invoke_processing_last_line)
self._invoke_processing_last_line_timer.start() self._invoke_processing_last_line_timer.start()
# If no further data is received in the next short period # If no further data is received in the next short period
@ -211,13 +195,15 @@ class Monitor(object):
# the last line. This is fix for handling lines sent # the last line. This is fix for handling lines sent
# without EOL. # without EOL.
elif event_tag == TAG_SERIAL_FLUSH: elif event_tag == TAG_SERIAL_FLUSH:
self.handle_serial_input(data, finalize_line=True) self.serial_handler.handle_serial_input(data, self.console_parser, self.coredump,
self.gdb_helper, self._line_matcher,
self.check_gdb_stub_and_run, finalize_line=True)
else: else:
raise RuntimeError('Bad event data %r' % ((event_tag, data),)) raise RuntimeError('Bad event data %r' % ((event_tag, data),))
except KeyboardInterrupt: except KeyboardInterrupt:
try: try:
yellow_print('To exit from IDF monitor please use \"Ctrl+]\"') yellow_print('To exit from IDF monitor please use \"Ctrl+]\"')
self.serial.write(codecs.encode('\x03')) self.serial.write(codecs.encode(CTRL_C))
except serial.SerialException: except serial.SerialException:
pass # this shouldn't happen, but sometimes port has closed in serial thread pass # this shouldn't happen, but sometimes port has closed in serial thread
except UnicodeEncodeError: except UnicodeEncodeError:
@ -234,80 +220,10 @@ class Monitor(object):
# Cancelling _invoke_processing_last_line_timer is not # Cancelling _invoke_processing_last_line_timer is not
# important here because receiving empty data doesn't matter. # important here because receiving empty data doesn't matter.
self._invoke_processing_last_line_timer = None self._invoke_processing_last_line_timer = None
except Exception: except Exception: # noqa
pass pass
normal_print('\n') normal_print('\n')
def check_gdb_stub_and_run(self, line): # type: (bytes) -> None
if self.gdb_helper.check_gdb_stub_trigger(line):
with self: # disable console control
self.gdb_helper.run_gdb()
def handle_serial_input(self, data, finalize_line=False):
# type: (bytes, bool) -> None
# Remove "+" after Continue command
if self.start_cmd_sent is True:
self.start_cmd_sent = False
pos = data.find(b'+')
if pos != -1:
data = data[(pos + 1):]
sp = data.split(b'\n')
if self._last_line_part != b'':
# add unprocessed part from previous "data" to the first line
sp[0] = self._last_line_part + sp[0]
self._last_line_part = b''
if sp[-1] != b'':
# last part is not a full line
self._last_line_part = sp.pop()
for line in sp:
if line == b'':
continue
if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'):
raise SerialStopException()
self.check_panic_decode_trigger(line)
with self.coredump.check(line):
if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')):
self.logger.print(line + b'\n')
self.handle_possible_pc_address_in_line(line)
self.check_gdb_stub_and_run(line)
self._force_line_print = False
# Now we have the last part (incomplete line) in _last_line_part. By
# default we don't touch it and just wait for the arrival of the rest
# of the line. But after some time when we didn't received it we need
# to make a decision.
force_print_or_matched = any((
self._force_line_print,
(finalize_line and self._line_matcher.match(self._last_line_part.decode(errors='ignore')))
))
if self._last_line_part != b'' and force_print_or_matched:
self._force_line_print = True
self.logger.print(self._last_line_part)
self.handle_possible_pc_address_in_line(self._last_line_part)
self.check_gdb_stub_and_run(self._last_line_part)
# It is possible that the incomplete line cuts in half the PC
# address. A small buffer is kept and will be used the next time
# handle_possible_pc_address_in_line is invoked to avoid this problem.
# MATCH_PCADDR matches 10 character long addresses. Therefore, we
# keep the last 9 characters.
self._pc_address_buffer = self._last_line_part[-9:]
# GDB sequence can be cut in half also. GDB sequence is 7
# characters long, therefore, we save the last 6 characters.
self.gdb_helper.gdb_buffer = self._last_line_part[-6:]
self._last_line_part = b''
# else: keeping _last_line_part and it will be processed the next time
# handle_serial_input is invoked
def handle_possible_pc_address_in_line(self, line): # type: (bytes) -> None
line = self._pc_address_buffer + line
self._pc_address_buffer = b''
if not self.enable_address_decoding:
return
for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')):
translation = lookup_pc_address(m.group(), self.toolchain_prefix, self.elf_file)
if translation:
self.logger.print(translation, console_printer=yellow_print)
def __enter__(self): def __enter__(self):
# type: () -> None # type: () -> None
""" Use 'with self' to temporarily disable monitoring behaviour """ """ Use 'with self' to temporarily disable monitoring behaviour """
@ -320,186 +236,20 @@ class Monitor(object):
self.serial_reader.gdb_exit = self.gdb_helper.gdb_exit # write gdb_exit flag self.serial_reader.gdb_exit = self.gdb_helper.gdb_exit # write gdb_exit flag
self.serial_reader.start() self.serial_reader.start()
def check_gdb_stub_and_run(self, line): # type: (bytes) -> None
if self.gdb_helper.check_gdb_stub_trigger(line):
with self: # disable console control
self.gdb_helper.run_gdb()
def run_make(self, target): # type: (str) -> None def run_make(self, target): # type: (str) -> None
with self: with self:
if isinstance(self.make, list): run_make(target, self.make, self.console, self.console_parser, self.event_queue, self.cmd_queue,
popen_args = self.make + [target] self.logger)
else:
popen_args = [self.make, target]
yellow_print('Running %s...' % ' '.join(popen_args))
p = subprocess.Popen(popen_args, env=os.environ)
try:
p.wait()
except KeyboardInterrupt:
p.wait()
if p.returncode != 0:
prompt_next_action('Build failed', self.console, self.console_parser, self.event_queue,
self.cmd_queue)
else:
self.logger.output_enabled = True
def check_panic_decode_trigger(self, line): # type: (bytes) -> None
if self._decode_panic == PANIC_DECODE_DISABLE:
return
if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')):
self._reading_panic = PANIC_READING
yellow_print('Stack dump detected')
if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line:
self.logger.output_enabled = False
if self._reading_panic == PANIC_READING:
self._panic_buffer += line.replace(b'\r', b'') + b'\n'
if self._reading_panic == PANIC_READING and PANIC_END in line:
self._reading_panic = PANIC_IDLE
self.logger.output_enabled = True
self.gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target)
self._panic_buffer = b''
def handle_commands(self, cmd, chip): # type: (int, str) -> None
config = get_chip_config(chip)
reset_delay = config['reset']
enter_boot_set = config['enter_boot_set']
enter_boot_unset = config['enter_boot_unset']
high = False
low = True
if cmd == CMD_STOP:
self.console_reader.stop()
self.serial_reader.stop()
elif cmd == CMD_RESET:
self.serial.setRTS(low)
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
time.sleep(reset_delay)
self.serial.setRTS(high)
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
self.logger.output_enabled = True
elif cmd == CMD_MAKE:
self.run_make('encrypted-flash' if self.encrypted else 'flash')
elif cmd == CMD_APP_FLASH:
self.run_make('encrypted-app-flash' if self.encrypted else 'app-flash')
elif cmd == CMD_OUTPUT_TOGGLE:
self.logger.output_toggle()
elif cmd == CMD_TOGGLE_LOGGING:
self.logger.toggle_logging()
elif cmd == CMD_TOGGLE_TIMESTAMPS:
self.logger.toggle_timestamps()
self.logger.toggle_logging()
elif cmd == CMD_ENTER_BOOT:
self.serial.setDTR(high) # IO0=HIGH
self.serial.setRTS(low) # EN=LOW, chip in reset
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
time.sleep(enter_boot_set) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
self.serial.setDTR(low) # IO0=LOW
self.serial.setRTS(high) # EN=HIGH, chip out of reset
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
time.sleep(enter_boot_unset) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
self.serial.setDTR(high) # IO0=HIGH, done
else:
raise RuntimeError('Bad command data %d' % cmd) # type: ignore
def main(): # type: () -> None def main(): # type: () -> None
parser = argparse.ArgumentParser('idf_monitor - a serial output monitor for esp-idf')
parser.add_argument(
'--port', '-p',
help='Serial port device',
default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0')
)
parser.add_argument(
'--disable-address-decoding', '-d',
help="Don't print lines about decoded addresses from the application ELF file",
action='store_true',
default=True if os.environ.get('ESP_MONITOR_DECODE') == 0 else False
)
parser.add_argument(
'--baud', '-b',
help='Serial port baud rate',
type=int,
default=os.getenv('IDF_MONITOR_BAUD', os.getenv('MONITORBAUD', 115200)))
parser.add_argument(
'--make', '-m',
help='Command to run make',
type=str, default='make')
parser.add_argument(
'--encrypted',
help='Use encrypted targets while running make',
action='store_true')
parser.add_argument(
'--toolchain-prefix',
help='Triplet prefix to add before cross-toolchain names',
default=DEFAULT_TOOLCHAIN_PREFIX)
parser.add_argument(
'--eol',
choices=['CR', 'LF', 'CRLF'],
type=lambda c: c.upper(),
help='End of line to use when sending to the serial port',
default='CR')
parser.add_argument(
'elf_file', help='ELF file of application',
type=argparse.FileType('rb'))
parser.add_argument(
'--print_filter',
help='Filtering string',
default=DEFAULT_PRINT_FILTER)
parser.add_argument(
'--decode-coredumps',
choices=[COREDUMP_DECODE_INFO, COREDUMP_DECODE_DISABLE],
default=COREDUMP_DECODE_INFO,
help='Handling of core dumps found in serial output'
)
parser.add_argument(
'--decode-panic',
choices=[PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE],
default=PANIC_DECODE_DISABLE,
help='Handling of panic handler info found in serial output'
)
parser.add_argument(
'--target',
help='Target name (used when stack dump decoding is enabled)',
default=os.environ.get('IDF_TARGET', 'esp32')
)
parser.add_argument(
'--revision',
help='Revision of the target',
type=int,
default=0
)
parser.add_argument(
'--ws',
default=os.environ.get('ESP_IDF_MONITOR_WS', None),
help='WebSocket URL for communicating with IDE tools for debugging purposes'
)
parser.add_argument(
'--timestamps',
help='Add timestamp for each line',
default=False,
action='store_true')
parser.add_argument(
'--timestamp-format',
default=os.environ.get('ESP_IDF_MONITOR_TIMESTAMP_FORMAT', '%Y-%m-%d %H:%M:%S'),
help='Set a strftime()-compatible timestamp format'
)
parser = get_parser()
args = parser.parse_args() args = parser.parse_args()
# GDB uses CreateFile to open COM port, which requires the COM name to be r'\\.\COMx' if the COM # GDB uses CreateFile to open COM port, which requires the COM name to be r'\\.\COMx' if the COM
@ -513,8 +263,7 @@ def main(): # type: () -> None
yellow_print('--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.') yellow_print('--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.')
yellow_print('--- Using %s instead...' % args.port) yellow_print('--- Using %s instead...' % args.port)
serial_instance = serial.serial_for_url(args.port, args.baud, serial_instance = serial.serial_for_url(args.port, args.baud, do_not_open=True)
do_not_open=True)
serial_instance.dtr = False serial_instance.dtr = False
serial_instance.rts = False serial_instance.rts = False
args.elf_file.close() # don't need this as a file args.elf_file.close() # don't need this as a file
@ -524,18 +273,17 @@ def main(): # type: () -> None
# all of the child makes we need (the -j argument remains part of # all of the child makes we need (the -j argument remains part of
# MAKEFLAGS) # MAKEFLAGS)
try: try:
makeflags = os.environ['MAKEFLAGS'] makeflags = os.environ[MAKEFLAGS_ENVIRON]
makeflags = re.sub(r'--jobserver[^ =]*=[0-9,]+ ?', '', makeflags) makeflags = re.sub(r'--jobserver[^ =]*=[0-9,]+ ?', '', makeflags)
os.environ['MAKEFLAGS'] = makeflags os.environ[MAKEFLAGS_ENVIRON] = makeflags
except KeyError: except KeyError:
pass # not running a make jobserver pass # not running a make jobserver
# Pass the actual used port to callee of idf_monitor (e.g. make) through `ESPPORT` environment # Pass the actual used port to callee of idf_monitor (e.g. make) through `ESPPORT` environment
# variable # variable
# To make sure the key as well as the value are str type, by the requirements of subprocess # To make sure the key as well as the value are str type, by the requirements of subprocess
espport_key = str('ESPPORT')
espport_val = str(args.port) espport_val = str(args.port)
os.environ.update({espport_key: espport_val}) os.environ.update({ESPPORT_ENVIRON: espport_val})
ws = WebSocketClient(args.ws) if args.ws else None ws = WebSocketClient(args.ws) if args.ws else None
try: try:
@ -553,7 +301,6 @@ def main(): # type: () -> None
key_description(CTRL_H))) key_description(CTRL_H)))
if args.print_filter != DEFAULT_PRINT_FILTER: if args.print_filter != DEFAULT_PRINT_FILTER:
yellow_print('--- Print filter: {} ---'.format(args.print_filter)) yellow_print('--- Print filter: {} ---'.format(args.print_filter))
monitor.main_loop() monitor.main_loop()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@ -17,11 +17,7 @@ import os
import re import re
import sys import sys
from io import TextIOBase from io import TextIOBase
from typing import Any, Optional, TextIO, Union
try:
from typing import Optional, Union
except ImportError:
pass
from .output_helpers import ANSI_NORMAL from .output_helpers import ANSI_NORMAL
@ -44,7 +40,7 @@ if os.name == 'nt':
def get_converter(orig_output_method=None, decode_output=False): def get_converter(orig_output_method=None, decode_output=False):
# type: (Optional[TextIOBase], bool) -> Union[ANSIColorConverter, Optional[TextIOBase]] # type: (Any[TextIO, Optional[TextIOBase]], bool) -> Union[ANSIColorConverter, Optional[TextIOBase]]
""" """
Returns an ANSIColorConverter on Windows and the original output method (orig_output_method) on other platforms. Returns an ANSIColorConverter on Windows and the original output method (orig_output_method) on other platforms.
The ANSIColorConverter with decode_output=True will decode the bytes before passing them to the output. The ANSIColorConverter with decode_output=True will decode the bytes before passing them to the output.

View File

@ -0,0 +1,120 @@
# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
from .constants import DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE
from .coredump import COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO
def get_parser(): # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser('idf_monitor - a serial output monitor for esp-idf')
parser.add_argument(
'--port', '-p',
help='Serial port device',
default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0')
)
parser.add_argument(
'--disable-address-decoding', '-d',
help="Don't print lines about decoded addresses from the application ELF file",
action='store_true',
default=os.environ.get('ESP_MONITOR_DECODE') == 0
)
parser.add_argument(
'--baud', '-b',
help='Serial port baud rate',
type=int,
default=os.getenv('IDF_MONITOR_BAUD', os.getenv('MONITORBAUD', 115200)))
parser.add_argument(
'--make', '-m',
help='Command to run make',
type=str, default='make')
parser.add_argument(
'--encrypted',
help='Use encrypted targets while running make',
action='store_true')
parser.add_argument(
'--toolchain-prefix',
help='Triplet prefix to add before cross-toolchain names',
default=DEFAULT_TOOLCHAIN_PREFIX)
parser.add_argument(
'--eol',
choices=['CR', 'LF', 'CRLF'],
type=lambda c: c.upper(),
help='End of line to use when sending to the serial port',
default='CR')
parser.add_argument(
'elf_file', help='ELF file of application',
type=argparse.FileType('rb'))
parser.add_argument(
'--print_filter',
help='Filtering string',
default=DEFAULT_PRINT_FILTER)
parser.add_argument(
'--decode-coredumps',
choices=[COREDUMP_DECODE_INFO, COREDUMP_DECODE_DISABLE],
default=COREDUMP_DECODE_INFO,
help='Handling of core dumps found in serial output'
)
parser.add_argument(
'--decode-panic',
choices=[PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE],
default=PANIC_DECODE_DISABLE,
help='Handling of panic handler info found in serial output'
)
parser.add_argument(
'--target',
help='Target name (used when stack dump decoding is enabled)',
default=os.environ.get('IDF_TARGET', 'esp32')
)
parser.add_argument(
'--revision',
help='Revision of the target',
type=int,
default=0
)
parser.add_argument(
'--ws',
default=os.environ.get('ESP_IDF_MONITOR_WS', None),
help='WebSocket URL for communicating with IDE tools for debugging purposes'
)
parser.add_argument(
'--timestamps',
help='Add timestamp for each line',
default=False,
action='store_true')
parser.add_argument(
'--timestamp-format',
default=os.environ.get('ESP_IDF_MONITOR_TIMESTAMP_FORMAT', '%Y-%m-%d %H:%M:%S'),
help='Set a strftime()-compatible timestamp format'
)
return parser

View File

@ -12,15 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import textwrap
try:
from typing import Optional
except ImportError:
pass
import queue import queue
import textwrap
from typing import Optional
from serial.tools import miniterm from serial.tools import miniterm

View File

@ -14,6 +14,7 @@
import os import os
import queue
import time import time
from serial.tools.miniterm import Console from serial.tools.miniterm import Console
@ -22,11 +23,6 @@ from .console_parser import ConsoleParser
from .constants import CMD_STOP, TAG_CMD from .constants import CMD_STOP, TAG_CMD
from .stoppable_thread import StoppableThread from .stoppable_thread import StoppableThread
try:
import queue
except ImportError:
import Queue as queue # type: ignore
class ConsoleReader(StoppableThread): class ConsoleReader(StoppableThread):
""" Read input keys from the console and push them to the queue, """ Read input keys from the console and push them to the queue,

View File

@ -18,6 +18,7 @@ import re
# Control-key characters # Control-key characters
CTRL_A = '\x01' CTRL_A = '\x01'
CTRL_B = '\x02' CTRL_B = '\x02'
CTRL_C = '\x03'
CTRL_F = '\x06' CTRL_F = '\x06'
CTRL_H = '\x08' CTRL_H = '\x08'
CTRL_I = '\x09' CTRL_I = '\x09'
@ -47,7 +48,6 @@ TAG_CMD = 3
__version__ = '1.1' __version__ = '1.1'
# paths to scripts # paths to scripts
PANIC_OUTPUT_DECODE_SCRIPT = os.path.join(os.path.dirname(__file__), '..', 'gdb_panic_server.py') PANIC_OUTPUT_DECODE_SCRIPT = os.path.join(os.path.dirname(__file__), '..', 'gdb_panic_server.py')
COREDUMP_SCRIPT = os.path.join(os.path.dirname(__file__), '..', '..', 'components', 'espcoredump', 'espcoredump.py') COREDUMP_SCRIPT = os.path.join(os.path.dirname(__file__), '..', '..', 'components', 'espcoredump', 'espcoredump.py')
@ -71,3 +71,19 @@ PANIC_READING = 1
# panic handler decoding options # panic handler decoding options
PANIC_DECODE_DISABLE = 'disable' PANIC_DECODE_DISABLE = 'disable'
PANIC_DECODE_BACKTRACE = 'backtrace' PANIC_DECODE_BACKTRACE = 'backtrace'
EVENT_QUEUE_TIMEOUT = 0.03 # timeout before raising queue.Empty exception in case of empty event queue
ESPPORT_ENVIRON = str('ESPPORT')
MAKEFLAGS_ENVIRON = 'MAKEFLAGS'
GDB_UART_CONTINUE_COMMAND = '+$c#63'
GDB_EXIT_TIMEOUT = 0.3 # time delay between exit and writing GDB_UART_CONTINUE_COMMAND
# workaround for data sent without EOL
# if no data received during the time, last line is considered finished
LAST_LINE_THREAD_INTERVAL = 0.1
MINIMAL_EN_LOW_DELAY = 0.005
RECONNECT_DELAY = 0.5 # timeout between reconnect tries
CHECK_ALIVE_FLAG_TIMEOUT = 0.25 # timeout for checking alive flags (currently used by serial reader)

View File

@ -7,7 +7,8 @@ from contextlib import contextmanager
from typing import Generator from typing import Generator
from .constants import COREDUMP_SCRIPT, TAG_KEY from .constants import COREDUMP_SCRIPT, TAG_KEY
from .output_helpers import Logger, yellow_print from .logger import Logger
from .output_helpers import yellow_print
from .web_socket_client import WebSocketClient from .web_socket_client import WebSocketClient
# coredump related messages # coredump related messages

View File

@ -5,7 +5,8 @@ import sys
import tempfile import tempfile
from .constants import PANIC_OUTPUT_DECODE_SCRIPT from .constants import PANIC_OUTPUT_DECODE_SCRIPT
from .output_helpers import Logger, normal_print, red_print, yellow_print from .logger import Logger
from .output_helpers import normal_print, red_print, yellow_print
from .web_socket_client import WebSocketClient from .web_socket_client import WebSocketClient

View File

@ -0,0 +1,134 @@
# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os
import re
from typing import BinaryIO, Callable, Optional, Union # noqa: F401
from serial.tools import miniterm # noqa: F401
from .constants import MATCH_PCADDR
from .output_helpers import lookup_pc_address, red_print, yellow_print
class Logger:
def __init__(self, elf_file, console, timestamps, timestamp_format, pc_address_buffer, enable_address_decoding,
toolchain_prefix):
# type: (str, miniterm.Console, bool, str, bytes, bool, str) -> None
self.log_file = None # type: Optional[BinaryIO]
self._output_enabled = True # type: bool
self.elf_file = elf_file
self.console = console
self.timestamps = timestamps
self.timestamp_format = timestamp_format
self._pc_address_buffer = pc_address_buffer
self.enable_address_decoding = enable_address_decoding
self.toolchain_prefix = toolchain_prefix
@property
def pc_address_buffer(self): # type: () -> bytes
return self._pc_address_buffer
@pc_address_buffer.setter
def pc_address_buffer(self, value): # type: (bytes) -> None
self._pc_address_buffer = value
@property
def output_enabled(self): # type: () -> bool
return self._output_enabled
@output_enabled.setter
def output_enabled(self, value): # type: (bool) -> None
self._output_enabled = value
@property
def log_file(self): # type: () -> Optional[BinaryIO]
return self._log_file
@log_file.setter
def log_file(self, value): # type: (Optional[BinaryIO]) -> None
self._log_file = value
def toggle_logging(self): # type: () -> None
if self._log_file:
self.stop_logging()
else:
self.start_logging()
def toggle_timestamps(self): # type: () -> None
self.timestamps = not self.timestamps
def start_logging(self): # type: () -> None
if not self._log_file:
name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0],
datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
try:
self.log_file = open(name, 'wb+')
yellow_print('\nLogging is enabled into file {}'.format(name))
except Exception as e: # noqa
red_print('\nLog file {} cannot be created: {}'.format(name, e))
def stop_logging(self): # type: () -> None
if self._log_file:
try:
name = self._log_file.name
self._log_file.close()
yellow_print('\nLogging is disabled and file {} has been closed'.format(name))
except Exception as e: # noqa
red_print('\nLog file cannot be closed: {}'.format(e))
finally:
self._log_file = None
def print(self, string, console_printer=None): # noqa: E999
# type: (Union[str, bytes], Optional[Callable]) -> None
if console_printer is None:
console_printer = self.console.write_bytes
if self.timestamps and (self._output_enabled or self._log_file):
t = datetime.datetime.now().strftime(self.timestamp_format)
# "string" is not guaranteed to be a full line. Timestamps should be only at the beginning of lines.
if isinstance(string, type(u'')):
search_patt = '\n'
replacement = '\n' + t + ' '
else:
search_patt = b'\n' # type: ignore
replacement = b'\n' + t.encode('ascii') + b' ' # type: ignore
string = string.replace(search_patt, replacement) # type: ignore
if self._output_enabled:
console_printer(string)
if self._log_file:
try:
if isinstance(string, type(u'')):
string = string.encode() # type: ignore
self._log_file.write(string) # type: ignore
except Exception as e:
red_print('\nCannot write to file: {}'.format(e))
# don't fill-up the screen with the previous errors (probably consequent prints would fail also)
self.stop_logging()
def output_toggle(self): # type: () -> None
self.output_enabled = not self.output_enabled
yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format(
self.output_enabled))
def handle_possible_pc_address_in_line(self, line): # type: (bytes) -> None
line = self._pc_address_buffer + line
self._pc_address_buffer = b''
if not self.enable_address_decoding:
return
for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')):
translation = lookup_pc_address(m.group(), self.toolchain_prefix, self.elf_file)
if translation:
self.print(translation, console_printer=yellow_print)

View File

@ -12,17 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import datetime
import os
import subprocess import subprocess
import sys import sys
from typing import BinaryIO, Callable, Optional, Union # noqa: F401
from serial.tools import miniterm
try:
from typing import BinaryIO, Callable, Optional, Union # noqa
except ImportError:
pass
# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be updated) # ANSI terminal codes (if changed, regular expressions in LineMatcher need to be updated)
ANSI_RED = '\033[1;31m' ANSI_RED = '\033[1;31m'
@ -57,92 +49,3 @@ def lookup_pc_address(pc_addr, toolchain_prefix, elf_file): # type: (str, str,
except OSError as e: except OSError as e:
red_print('%s: %s' % (' '.join(cmd), e)) red_print('%s: %s' % (' '.join(cmd), e))
return None return None
class Logger:
def __init__(self, elf_file, console, timestamps, timestamp_format):
# type: (str, miniterm.Console, bool, str) -> None
self.log_file = None # type: Optional[BinaryIO]
self._output_enabled = True # type: bool
self.elf_file = elf_file
self.console = console
self.timestamps = timestamps
self.timestamp_format = timestamp_format
@property
def output_enabled(self): # type: () -> bool
return self._output_enabled
@output_enabled.setter
def output_enabled(self, value): # type: (bool) -> None
self._output_enabled = value
@property
def log_file(self): # type: () -> Optional[BinaryIO]
return self._log_file
@log_file.setter
def log_file(self, value): # type: (Optional[BinaryIO]) -> None
self._log_file = value
def toggle_logging(self): # type: () -> None
if self._log_file:
self.stop_logging()
else:
self.start_logging()
def toggle_timestamps(self): # type: () -> None
self.timestamps = not self.timestamps
def start_logging(self): # type: () -> None
if not self._log_file:
name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0],
datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
try:
self.log_file = open(name, 'wb+')
yellow_print('\nLogging is enabled into file {}'.format(name))
except Exception as e:
red_print('\nLog file {} cannot be created: {}'.format(name, e))
def stop_logging(self): # type: () -> None
if self._log_file:
try:
name = self._log_file.name
self._log_file.close()
yellow_print('\nLogging is disabled and file {} has been closed'.format(name))
except Exception as e:
red_print('\nLog file cannot be closed: {}'.format(e))
finally:
self._log_file = None
def print(self, string, console_printer=None): # noqa: E999
# type: (Union[str, bytes], Optional[Callable]) -> None
if console_printer is None:
console_printer = self.console.write_bytes
if self.timestamps and (self._output_enabled or self._log_file):
t = datetime.datetime.now().strftime(self.timestamp_format)
# "string" is not guaranteed to be a full line. Timestamps should be only at the beginning of lines.
if isinstance(string, type(u'')):
search_patt = '\n'
replacement = '\n' + t + ' '
else:
search_patt = b'\n' # type: ignore
replacement = b'\n' + t.encode('ascii') + b' ' # type: ignore
string = string.replace(search_patt, replacement) # type: ignore
if self._output_enabled:
console_printer(string)
if self._log_file:
try:
if isinstance(string, type(u'')):
string = string.encode() # type: ignore
self._log_file.write(string) # type: ignore
except Exception as e:
red_print('\nCannot write to file: {}'.format(e))
# don't fill-up the screen with the previous errors (probably consequent prints would fail also)
self.stop_logging()
def output_toggle(self): # type: () -> None
self.output_enabled = not self.output_enabled
yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format(
self.output_enabled))

View File

@ -0,0 +1,195 @@
# Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import queue # noqa: F401
import re
import subprocess
import time
from typing import Callable
import serial # noqa: F401
from serial.tools import miniterm # noqa: F401
from .chip_specific_config import get_chip_config
from .console_parser import ConsoleParser, prompt_next_action # noqa: F401
from .console_reader import ConsoleReader # noqa: F401
from .constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, CMD_STOP,
CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE,
PANIC_READING, PANIC_STACK_DUMP, PANIC_START)
from .coredump import CoreDump # noqa: F401
from .exceptions import SerialStopException # noqa: F401
from .gdbhelper import GDBHelper # noqa: F401
from .line_matcher import LineMatcher # noqa: F401
from .logger import Logger # noqa: F401
from .output_helpers import yellow_print
from .serial_reader import SerialReader # noqa: F401
def run_make(target, make, console, console_parser, event_queue, cmd_queue, logger):
# type: (str, str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue, Logger) -> None
if isinstance(make, list):
popen_args = make + [target]
else:
popen_args = [make, target]
yellow_print('Running %s...' % ' '.join(popen_args))
p = subprocess.Popen(popen_args, env=os.environ)
try:
p.wait()
except KeyboardInterrupt:
p.wait()
if p.returncode != 0:
prompt_next_action('Build failed', console, console_parser, event_queue, cmd_queue)
else:
logger.output_enabled = True
class SerialHandler:
"""
The class is responsible for buffering serial input and performing corresponding commands.
"""
def __init__(self, last_line_part, serial_check_exit, logger, decode_panic, reading_panic, panic_buffer, target,
force_line_print, start_cmd_sent, serial_instance, encrypted, ):
# type: (bytes, bool, Logger, str, int, bytes,str, bool, bool, serial.Serial, bool) -> None
self._last_line_part = last_line_part
self._serial_check_exit = serial_check_exit
self.logger = logger
self._decode_panic = decode_panic
self._reading_panic = reading_panic
self._panic_buffer = panic_buffer
self.target = target
self._force_line_print = force_line_print
self.start_cmd_sent = start_cmd_sent
self.serial_instance = serial_instance
self.encrypted = encrypted
def handle_serial_input(self, data, console_parser, coredump, gdb_helper, line_matcher,
check_gdb_stub_and_run, finalize_line=False):
# type: (bytes, ConsoleParser, CoreDump, GDBHelper, LineMatcher, Callable, bool) -> None
# Remove "+" after Continue command
if self.start_cmd_sent:
self.start_cmd_sent = False
pos = data.find(b'+')
if pos != -1:
data = data[(pos + 1):]
sp = data.split(b'\n')
if self._last_line_part != b'':
# add unprocessed part from previous "data" to the first line
sp[0] = self._last_line_part + sp[0]
self._last_line_part = b''
if sp[-1] != b'':
# last part is not a full line
self._last_line_part = sp.pop()
for line in sp:
if line == b'':
continue
if self._serial_check_exit and line == console_parser.exit_key.encode('latin-1'):
raise SerialStopException()
self.check_panic_decode_trigger(line, gdb_helper)
with coredump.check(line):
if self._force_line_print or line_matcher.match(line.decode(errors='ignore')):
self.logger.print(line + b'\n')
self.logger.handle_possible_pc_address_in_line(line)
check_gdb_stub_and_run(line)
self._force_line_print = False
# Now we have the last part (incomplete line) in _last_line_part. By
# default we don't touch it and just wait for the arrival of the rest
# of the line. But after some time when we didn't received it we need
# to make a decision.
force_print_or_matched = any((
self._force_line_print,
(finalize_line and line_matcher.match(self._last_line_part.decode(errors='ignore')))
))
if self._last_line_part != b'' and force_print_or_matched:
self._force_line_print = True
self.logger.print(self._last_line_part)
self.logger.handle_possible_pc_address_in_line(self._last_line_part)
check_gdb_stub_and_run(self._last_line_part)
# It is possible that the incomplete line cuts in half the PC
# address. A small buffer is kept and will be used the next time
# handle_possible_pc_address_in_line is invoked to avoid this problem.
# MATCH_PCADDR matches 10 character long addresses. Therefore, we
# keep the last 9 characters.
self.logger.pc_address_buffer = self._last_line_part[-9:]
# GDB sequence can be cut in half also. GDB sequence is 7
# characters long, therefore, we save the last 6 characters.
gdb_helper.gdb_buffer = self._last_line_part[-6:]
self._last_line_part = b''
# else: keeping _last_line_part and it will be processed the next time
# handle_serial_input is invoked
def check_panic_decode_trigger(self, line, gdb_helper): # type: (bytes, GDBHelper) -> None
if self._decode_panic == PANIC_DECODE_DISABLE:
return
if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')):
self._reading_panic = PANIC_READING
yellow_print('Stack dump detected')
if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line:
self.logger.output_enabled = False
if self._reading_panic == PANIC_READING:
self._panic_buffer += line.replace(b'\r', b'') + b'\n'
if self._reading_panic == PANIC_READING and PANIC_END in line:
self._reading_panic = PANIC_IDLE
self.logger.output_enabled = True
gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target)
self._panic_buffer = b''
def handle_commands(self, cmd, chip, run_make_func, console_reader, serial_reader):
# type: (int, str, Callable, ConsoleReader, SerialReader) -> None
config = get_chip_config(chip)
reset_delay = config['reset']
enter_boot_set = config['enter_boot_set']
enter_boot_unset = config['enter_boot_unset']
high = False
low = True
if cmd == CMD_STOP:
console_reader.stop()
serial_reader.stop()
elif cmd == CMD_RESET:
self.serial_instance.setRTS(low)
self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround
time.sleep(reset_delay)
self.serial_instance.setRTS(high)
self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround
self.logger.output_enabled = True
elif cmd == CMD_MAKE:
run_make_func('encrypted-flash' if self.encrypted else 'flash')
elif cmd == CMD_APP_FLASH:
run_make_func('encrypted-app-flash' if self.encrypted else 'app-flash')
elif cmd == CMD_OUTPUT_TOGGLE:
self.logger.output_toggle()
elif cmd == CMD_TOGGLE_LOGGING:
self.logger.toggle_logging()
elif cmd == CMD_TOGGLE_TIMESTAMPS:
self.logger.toggle_timestamps()
self.logger.toggle_logging()
elif cmd == CMD_ENTER_BOOT:
self.serial_instance.setDTR(high) # IO0=HIGH
self.serial_instance.setRTS(low) # EN=LOW, chip in reset
self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround
time.sleep(enter_boot_set) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
self.serial_instance.setDTR(low) # IO0=LOW
self.serial_instance.setRTS(high) # EN=HIGH, chip out of reset
self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround
time.sleep(enter_boot_unset) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
self.serial_instance.setDTR(high) # IO0=HIGH, done
else:
raise RuntimeError('Bad command data %d' % cmd) # type: ignore

View File

@ -12,26 +12,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import queue
import sys import sys
import time import time
import serial import serial
from .constants import TAG_SERIAL from .constants import CHECK_ALIVE_FLAG_TIMEOUT, MINIMAL_EN_LOW_DELAY, RECONNECT_DELAY, TAG_SERIAL
from .output_helpers import red_print, yellow_print from .output_helpers import red_print, yellow_print
from .stoppable_thread import StoppableThread from .stoppable_thread import StoppableThread
try:
import queue
except ImportError:
import Queue as queue # type: ignore # noqa
class SerialReader(StoppableThread): class SerialReader(StoppableThread):
""" Read serial data from the serial port and push to the """ Read serial data from the serial port and push to the
event queue, until stopped. event queue, until stopped.
""" """
def __init__(self, serial_instance, event_queue): def __init__(self, serial_instance, event_queue):
# type: (serial.Serial, queue.Queue) -> None # type: (serial.Serial, queue.Queue) -> None
super(SerialReader, self).__init__() super(SerialReader, self).__init__()
@ -42,7 +37,7 @@ class SerialReader(StoppableThread):
if not hasattr(self.serial, 'cancel_read'): if not hasattr(self.serial, 'cancel_read'):
# enable timeout for checking alive flag, # enable timeout for checking alive flag,
# if cancel_read not available # if cancel_read not available
self.serial.timeout = 0.25 self.serial.timeout = CHECK_ALIVE_FLAG_TIMEOUT
def run(self): def run(self):
# type: () -> None # type: () -> None
@ -51,18 +46,23 @@ class SerialReader(StoppableThread):
# We can come to this thread at startup or from external application line GDB. # We can come to this thread at startup or from external application line GDB.
# If we come from GDB we would like to continue to run without reset. # If we come from GDB we would like to continue to run without reset.
self.serial.dtr = True # Non reset state high = False
self.serial.rts = False # IO0=HIGH low = True
self.serial.dtr = low # Non reset state
self.serial.rts = high # IO0=HIGH
self.serial.dtr = self.serial.dtr # usbser.sys workaround self.serial.dtr = self.serial.dtr # usbser.sys workaround
# Current state not reset the target! # Current state not reset the target!
self.serial.open() self.serial.open()
if not self.gdb_exit: if not self.gdb_exit:
self.serial.dtr = False # Set dtr to reset state (affected by rts) self.serial.dtr = high # Set dtr to reset state (affected by rts)
self.serial.rts = True # Set rts/dtr to the reset state self.serial.rts = low # Set rts/dtr to the reset state
self.serial.dtr = self.serial.dtr # usbser.sys workaround self.serial.dtr = self.serial.dtr # usbser.sys workaround
time.sleep(0.005) # Add a delay to meet the requirements of minimal EN low time (2ms for ESP32-C3)
# Add a delay to meet the requirements of minimal EN low time (2ms for ESP32-C3)
time.sleep(MINIMAL_EN_LOW_DELAY)
self.gdb_exit = False self.gdb_exit = False
self.serial.rts = False # Set rts/dtr to the working state self.serial.rts = high # Set rts/dtr to the working state
self.serial.dtr = self.serial.dtr # usbser.sys workaround self.serial.dtr = self.serial.dtr # usbser.sys workaround
try: try:
while self.alive: while self.alive:
@ -71,13 +71,13 @@ class SerialReader(StoppableThread):
except (serial.serialutil.SerialException, IOError) as e: except (serial.serialutil.SerialException, IOError) as e:
data = b'' data = b''
# self.serial.open() was successful before, therefore, this is an issue related to # self.serial.open() was successful before, therefore, this is an issue related to
# the disapperence of the device # the disappearance of the device
red_print(e) red_print(e)
yellow_print('Waiting for the device to reconnect', newline='') yellow_print('Waiting for the device to reconnect', newline='')
self.serial.close() self.serial.close()
while self.alive: # so that exiting monitor works while waiting while self.alive: # so that exiting monitor works while waiting
try: try:
time.sleep(0.5) time.sleep(RECONNECT_DELAY)
self.serial.open() self.serial.open()
break # device connected break # device connected
except serial.serialutil.SerialException: except serial.serialutil.SerialException:

View File

@ -13,11 +13,7 @@
# limitations under the License. # limitations under the License.
import threading import threading
from typing import Optional
try:
from typing import Optional
except ImportError:
pass
class StoppableThread(object): class StoppableThread(object):