Moved filters out of idf_monitor.py

This commit is contained in:
Martin Gaňo 2021-05-14 11:20:38 +02:00
parent d3de177c64
commit 252036567c
8 changed files with 383 additions and 295 deletions

View File

@ -96,7 +96,7 @@
- "tools/esp_app_trace/**/*"
- "tools/ldgen/**/*"
- "tools/gdb_panic_server.py"
- "tools/idf_monitor_base/*"
- "tools/idf_monitor.py"
- "tools/test_idf_monitor/**/*"

View File

@ -219,7 +219,6 @@ tools/esp_prov/utils/convenience.py
tools/find_apps.py
tools/find_build_apps/common.py
tools/find_build_apps/make.py
tools/gdb_panic_server.py
tools/gen_esp_err_to_name.py
tools/idf.py
tools/idf_py_actions/core_ext.py

View File

@ -75,7 +75,7 @@ GDB_REGS_INFO = {
PanicInfo = namedtuple('PanicInfo', 'core_id regs stack_base_addr stack_data')
def build_riscv_panic_output_parser(): # type: () -> typing.Type[ParserElement]
def build_riscv_panic_output_parser(): # type: () -> typing.Any[typing.Type[ParserElement]]
"""Builds a parser for the panic handler output using pyparsing"""
# We don't match the first line, since "Guru Meditation" will not be printed in case of an abort:
@ -258,7 +258,7 @@ class GdbServer(object):
# For any memory address that is not on the stack, pretend the value is 0x00.
# GDB should never ask us for program memory, it will be obtained from the ELF file.
def in_stack(addr):
def in_stack(addr): # type: (int) -> typing.Any[bool]
return stack_addr_min <= addr < stack_addr_max
result = ''
@ -271,7 +271,7 @@ class GdbServer(object):
self._respond(result)
def main():
def main(): # type: () -> None
parser = argparse.ArgumentParser()
parser.add_argument('input_file', type=argparse.FileType('r'),
help='File containing the panic handler output')

View File

@ -34,32 +34,36 @@ from __future__ import division, print_function, unicode_literals
import argparse
import codecs
import datetime
import os
import re
import subprocess
import threading
import time
from builtins import bytes, object
from typing import AnyStr, BinaryIO, Callable, List, Optional, Union
import serial.tools.miniterm as miniterm
try:
from typing import List, Optional, Union # noqa
except ImportError:
pass
from idf_monitor_base import (COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO, COREDUMP_DONE, COREDUMP_IDLE,
COREDUMP_READING, COREDUMP_UART_END, COREDUMP_UART_PROMPT, COREDUMP_UART_START,
DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, MATCH_PCADDR, PANIC_DECODE_BACKTRACE,
PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE, PANIC_READING, PANIC_STACK_DUMP,
PANIC_START)
PANIC_START, prompt_next_action)
from idf_monitor_base.chip_specific_config import get_chip_config
from idf_monitor_base.console_parser import ConsoleParser
from idf_monitor_base.console_reader import ConsoleReader
from idf_monitor_base.constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET,
CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_H, CTRL_T, TAG_CMD,
TAG_KEY, TAG_SERIAL, TAG_SERIAL_FLUSH)
CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_H, TAG_CMD, TAG_KEY,
TAG_SERIAL, TAG_SERIAL_FLUSH)
from idf_monitor_base.exceptions import SerialStopException
from idf_monitor_base.gdbhelper import GDBHelper
from idf_monitor_base.line_matcher import LineMatcher
from idf_monitor_base.output_helpers import normal_print, red_print, yellow_print
from idf_monitor_base.output_helpers import Logger, lookup_pc_address, normal_print, yellow_print
from idf_monitor_base.serial_reader import SerialReader
from idf_monitor_base.web_socket_client import WebSocketClient
from serial.tools import miniterm
try:
import queue # noqa
@ -89,21 +93,21 @@ class Monitor(object):
"""
def __init__(
self,
serial_instance, # type: serial.Serial
elf_file, # type: str
print_filter, # type: str
make='make', # type: str
encrypted=False, # type: bool
toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, # type: str
eol='CRLF', # type: str
decode_coredumps=COREDUMP_DECODE_INFO, # type: str
decode_panic=PANIC_DECODE_DISABLE, # type: str
target='esp32', # type: str
websocket_client=None, # type: WebSocketClient
enable_address_decoding=True, # type: bool
timestamps=False, # type: bool
timestamp_format='' # type: str
self,
serial_instance, # type: serial.Serial
elf_file, # type: str
print_filter, # type: str
make='make', # type: str
encrypted=False, # type: bool
toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, # type: str
eol='CRLF', # type: str
decode_coredumps=COREDUMP_DECODE_INFO, # type: str
decode_panic=PANIC_DECODE_DISABLE, # type: str
target='esp32', # type: str
websocket_client=None, # type: WebSocketClient
enable_address_decoding=True, # type: bool
timestamps=False, # type: bool
timestamp_format='' # type: str
):
super(Monitor, self).__init__()
self.event_queue = queue.Queue() # type: queue.Queue
@ -136,24 +140,21 @@ class Monitor(object):
# internal state
self._last_line_part = b''
self._gdb_buffer = b''
self._pc_address_buffer = b''
self._line_matcher = LineMatcher(print_filter)
self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer]
self._force_line_print = False
self._output_enabled = True
self._serial_check_exit = socket_mode
self._log_file = None # type: Optional[BinaryIO]
self._decode_coredumps = decode_coredumps
self._reading_coredump = COREDUMP_IDLE
self._coredump_buffer = b''
self._decode_panic = decode_panic
self._reading_panic = PANIC_IDLE
self._panic_buffer = b''
self.gdb_exit = False
self.start_cmd_sent = False
self._timestamps = timestamps
self._timestamp_format = timestamp_format
self.gdb_helper = GDBHelper(self.toolchain_prefix, self.websocket_client, self.elf_file, self.serial.port,
self.serial.baudrate)
self.logger = Logger(self.elf_file, self.console, timestamps, timestamp_format)
def invoke_processing_last_line(self):
# type: () -> None
@ -163,13 +164,13 @@ class Monitor(object):
# type: () -> None
self.console_reader.start()
self.serial_reader.start()
self.gdb_exit = False
self.gdb_helper.gdb_exit = False
self.start_cmd_sent = False
try:
while self.console_reader.alive and self.serial_reader.alive:
try:
if self.gdb_exit is True:
self.gdb_exit = False
if self.gdb_helper.gdb_exit:
self.gdb_helper.gdb_exit = False
time.sleep(0.3)
try:
@ -204,7 +205,8 @@ class Monitor(object):
self.handle_serial_input(data)
if self._invoke_processing_last_line_timer is not None:
self._invoke_processing_last_line_timer.cancel()
self._invoke_processing_last_line_timer = threading.Timer(0.1, self.invoke_processing_last_line)
self._invoke_processing_last_line_timer = threading.Timer(0.1,
self.invoke_processing_last_line)
self._invoke_processing_last_line_timer.start()
# If no futher data is received in the next short period
# of time then the _invoke_processing_last_line_timer
@ -214,7 +216,7 @@ class Monitor(object):
elif event_tag == TAG_SERIAL_FLUSH:
self.handle_serial_input(data, finalize_line=True)
else:
raise RuntimeError('Bad event data %r' % ((event_tag,data),))
raise RuntimeError('Bad event data %r' % ((event_tag, data),))
except KeyboardInterrupt:
try:
yellow_print('To exit from IDF monitor please use \"Ctrl+]\"')
@ -231,7 +233,7 @@ class Monitor(object):
try:
self.console_reader.stop()
self.serial_reader.stop()
self.stop_logging()
self.logger.stop_logging()
# Cancelling _invoke_processing_last_line_timer is not
# important here because receiving empty data doesn't matter.
self._invoke_processing_last_line_timer = None
@ -239,6 +241,11 @@ class Monitor(object):
pass
normal_print('\n')
def check_gdb_stub_and_run(self, line): # type: (bytes) -> None
if self.gdb_helper.check_gdb_stub_trigger(line):
with self: # disable console control
self.gdb_helper.run_gdb()
def handle_serial_input(self, data, finalize_line=False):
# type: (bytes, bool) -> None
# Remove "+" after Continue command
@ -257,47 +264,53 @@ class Monitor(object):
# last part is not a full line
self._last_line_part = sp.pop()
for line in sp:
if line != b'':
if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'):
raise SerialStopException()
self.check_panic_decode_trigger(line)
self.check_coredump_trigger_before_print(line)
if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')):
self._print(line + b'\n')
self.handle_possible_pc_address_in_line(line)
self.check_coredump_trigger_after_print()
self.check_gdbstub_trigger(line)
self._force_line_print = False
if line == b'':
continue
if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'):
raise SerialStopException()
self.check_panic_decode_trigger(line)
self.check_coredump_trigger_before_print(line)
if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')):
self.logger.print(line + b'\n')
self.handle_possible_pc_address_in_line(line)
self.check_coredump_trigger_after_print()
self.check_gdb_stub_and_run(line)
self._force_line_print = False
# Now we have the last part (incomplete line) in _last_line_part. By
# default we don't touch it and just wait for the arrival of the rest
# of the line. But after some time when we didn't received it we need
# to make a decision.
if self._last_line_part != b'':
if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors='ignore'))):
self._force_line_print = True
self._print(self._last_line_part)
self.handle_possible_pc_address_in_line(self._last_line_part)
self.check_gdbstub_trigger(self._last_line_part)
# It is possible that the incomplete line cuts in half the PC
# address. A small buffer is kept and will be used the next time
# handle_possible_pc_address_in_line is invoked to avoid this problem.
# MATCH_PCADDR matches 10 character long addresses. Therefore, we
# keep the last 9 characters.
self._pc_address_buffer = self._last_line_part[-9:]
# GDB sequence can be cut in half also. GDB sequence is 7
# characters long, therefore, we save the last 6 characters.
self._gdb_buffer = self._last_line_part[-6:]
self._last_line_part = b''
force_print_or_matched = any((
self._force_line_print,
(finalize_line and self._line_matcher.match(self._last_line_part.decode(errors='ignore')))
))
if self._last_line_part != b'' and force_print_or_matched:
self._force_line_print = True
self.logger.print(self._last_line_part)
self.handle_possible_pc_address_in_line(self._last_line_part)
self.check_gdb_stub_and_run(self._last_line_part)
# It is possible that the incomplete line cuts in half the PC
# address. A small buffer is kept and will be used the next time
# handle_possible_pc_address_in_line is invoked to avoid this problem.
# MATCH_PCADDR matches 10 character long addresses. Therefore, we
# keep the last 9 characters.
self._pc_address_buffer = self._last_line_part[-9:]
# GDB sequence can be cut in half also. GDB sequence is 7
# characters long, therefore, we save the last 6 characters.
self.gdb_helper.gdb_buffer = self._last_line_part[-6:]
self._last_line_part = b''
# else: keeping _last_line_part and it will be processed the next time
# handle_serial_input is invoked
def handle_possible_pc_address_in_line(self, line):
# type: (bytes) -> None
def handle_possible_pc_address_in_line(self, line): # type: (bytes) -> None
line = self._pc_address_buffer + line
self._pc_address_buffer = b''
if self.enable_address_decoding:
for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')):
self.lookup_pc_address(m.group())
if not self.enable_address_decoding:
return
for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')):
translation = lookup_pc_address(m.group(), self.toolchain_prefix, self.elf_file)
if translation:
self.logger.print(translation, console_printer=yellow_print)
def __enter__(self):
# type: () -> None
@ -308,29 +321,9 @@ class Monitor(object):
def __exit__(self, *args, **kwargs): # type: ignore
""" Use 'with self' to temporarily disable monitoring behaviour """
self.console_reader.start()
self.serial_reader.gdb_exit = self.gdb_exit # write gdb_exit flag
self.serial_reader.gdb_exit = self.gdb_helper.gdb_exit # write gdb_exit flag
self.serial_reader.start()
def prompt_next_action(self, reason): # type: (str) -> None
self.console.setup() # set up console to trap input characters
try:
red_print('--- {}'.format(reason))
red_print(self.console_parser.get_next_action_text())
k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
while k == CTRL_T:
k = self.console.getkey()
finally:
self.console.cleanup()
ret = self.console_parser.parse_next_action_key(k)
if ret is not None:
cmd = ret[1]
if cmd == CMD_STOP:
# the stop command should be handled last
self.event_queue.put(ret)
else:
self.cmd_queue.put(ret)
def run_make(self, target): # type: (str) -> None
with self:
if isinstance(self.make, list):
@ -344,43 +337,10 @@ class Monitor(object):
except KeyboardInterrupt:
p.wait()
if p.returncode != 0:
self.prompt_next_action('Build failed')
prompt_next_action('Build failed', self.console, self.console_parser, self.event_queue,
self.cmd_queue)
else:
self.output_enable(True)
def lookup_pc_address(self, pc_addr): # type: (str) -> None
cmd = ['%saddr2line' % self.toolchain_prefix,
'-pfiaC', '-e', self.elf_file, pc_addr]
try:
translation = subprocess.check_output(cmd, cwd='.')
if b'?? ??:0' not in translation:
self._print(translation.decode(), console_printer=yellow_print)
except OSError as e:
red_print('%s: %s' % (' '.join(cmd), e))
def check_gdbstub_trigger(self, line): # type: (bytes) -> None
line = self._gdb_buffer + line
self._gdb_buffer = b''
m = re.search(b'\\$(T..)#(..)', line) # look for a gdb "reason" for a break
if m is not None:
try:
chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
calc_chsum = int(m.group(2), 16)
except ValueError:
return # payload wasn't valid hex digits
if chsum == calc_chsum:
if self.websocket_client:
yellow_print('Communicating through WebSocket')
self.websocket_client.send({'event': 'gdb_stub',
'port': self.serial.port,
'prog': self.elf_file})
yellow_print('Waiting for debug finished event')
self.websocket_client.wait([('event', 'debug_finished')])
yellow_print('Communications through WebSocket is finished')
else:
self.run_gdb()
else:
red_print('Malformed gdb message... calculated checksum %02x received %02x' % (chsum, calc_chsum))
self.logger.output_enabled = True
def check_coredump_trigger_before_print(self, line): # type: (bytes) -> None
if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
@ -395,7 +355,7 @@ class Monitor(object):
yellow_print('Core dump started (further output muted)')
self._reading_coredump = COREDUMP_READING
self._coredump_buffer = b''
self._output_enabled = False
self.logger.output_enabled = False
return
if COREDUMP_UART_END in line:
@ -410,16 +370,16 @@ class Monitor(object):
self._coredump_buffer += line.replace(b'\r', b'') + b'\n'
new_buffer_len_kb = len(self._coredump_buffer) // kb
if new_buffer_len_kb > buffer_len_kb:
yellow_print('Received %3d kB...' % (new_buffer_len_kb), newline='\r')
yellow_print('Received %3d kB...' % new_buffer_len_kb, newline='\r')
def check_coredump_trigger_after_print(self): # type: () -> None
if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
return
# Re-enable output after the last line of core dump has been consumed
if not self._output_enabled and self._reading_coredump == COREDUMP_DONE:
if not self.logger.output_enabled and self._reading_coredump == COREDUMP_DONE:
self._reading_coredump = COREDUMP_IDLE
self._output_enabled = True
self.logger.output_enabled = True
self._coredump_buffer = b''
def process_coredump(self): # type: () -> None
@ -436,7 +396,7 @@ class Monitor(object):
coredump_file.flush()
if self.websocket_client:
self._output_enabled = True
self.logger.output_enabled = True
yellow_print('Communicating through WebSocket')
self.websocket_client.send({'event': 'coredump',
'file': coredump_file.name,
@ -453,14 +413,14 @@ class Monitor(object):
self.elf_file
]
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
self._output_enabled = True
self._print(output)
self._output_enabled = False # Will be reenabled in check_coredump_trigger_after_print
self.logger.output_enabled = True
self.logger.print(output)
self.logger.output_enabled = False # Will be reenabled in check_coredump_trigger_after_print
except subprocess.CalledProcessError as e:
yellow_print('Failed to run espcoredump script: {}\n{}\n\n'.format(e, e.output))
self._output_enabled = True
self._print(COREDUMP_UART_START + b'\n')
self._print(self._coredump_buffer)
self.logger.output_enabled = True
self.logger.print(COREDUMP_UART_START + b'\n')
self.logger.print(self._coredump_buffer)
# end line will be printed in handle_serial_input
finally:
if coredump_file is not None:
@ -478,155 +438,17 @@ class Monitor(object):
yellow_print('Stack dump detected')
if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line:
self._output_enabled = False
self.logger.output_enabled = False
if self._reading_panic == PANIC_READING:
self._panic_buffer += line.replace(b'\r', b'') + b'\n'
if self._reading_panic == PANIC_READING and PANIC_END in line:
self._reading_panic = PANIC_IDLE
self._output_enabled = True
self.process_panic_output(self._panic_buffer)
self.logger.output_enabled = True
self.gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target)
self._panic_buffer = b''
def process_panic_output(self, panic_output): # type: (bytes) -> None
panic_output_decode_script = os.path.join(os.path.dirname(__file__), '..', 'tools', 'gdb_panic_server.py')
panic_output_file = None
try:
# On Windows, the temporary file can't be read unless it is closed.
# Set delete=False and delete the file manually later.
with tempfile.NamedTemporaryFile(mode='wb', delete=False) as panic_output_file:
panic_output_file.write(panic_output)
panic_output_file.flush()
cmd = [self.toolchain_prefix + 'gdb',
'--batch', '-n',
self.elf_file,
'-ex', "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\""
.format(python=sys.executable,
script=panic_output_decode_script,
target=self.target,
output_file=panic_output_file.name),
'-ex', 'bt']
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
yellow_print('\nBacktrace:\n\n')
self._print(output)
except subprocess.CalledProcessError as e:
yellow_print('Failed to run gdb_panic_server.py script: {}\n{}\n\n'.format(e, e.output))
self._print(panic_output)
finally:
if panic_output_file is not None:
try:
os.unlink(panic_output_file.name)
except OSError as e:
yellow_print('Couldn\'t remove temporary panic output file ({})'.format(e))
def run_gdb(self): # type: () -> None
with self: # disable console control
normal_print('')
try:
cmd = ['%sgdb' % self.toolchain_prefix,
'-ex', 'set serial baud %d' % self.serial.baudrate,
'-ex', 'target remote %s' % self.serial.port,
self.elf_file]
# Here we handling GDB as a process
# Open GDB process
try:
process = subprocess.Popen(cmd, cwd='.')
except KeyboardInterrupt:
pass
# We ignore Ctrl+C interrupt form external process abd wait responce util GDB will be finished.
while True:
try:
process.wait()
break
except KeyboardInterrupt:
pass # We ignore the Ctrl+C
self.gdb_exit = True
except OSError as e:
red_print('%s: %s' % (' '.join(cmd), e))
except KeyboardInterrupt:
pass # happens on Windows, maybe other OSes
finally:
try:
# on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
process.terminate()
except Exception:
pass
try:
# also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
subprocess.call(['stty', 'sane'])
except Exception:
pass # don't care if there's no stty, we tried...
def output_enable(self, enable): # type: (bool) -> None
self._output_enabled = enable
def output_toggle(self): # type: () -> None
self._output_enabled = not self._output_enabled
yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format(
self._output_enabled))
def toggle_logging(self): # type: () -> None
if self._log_file:
self.stop_logging()
else:
self.start_logging()
def toggle_timestamps(self): # type: () -> None
self._timestamps = not self._timestamps
def start_logging(self): # type: () -> None
if not self._log_file:
name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0],
datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
try:
self._log_file = open(name, 'wb+')
yellow_print('\nLogging is enabled into file {}'.format(name))
except Exception as e:
red_print('\nLog file {} cannot be created: {}'.format(name, e))
def stop_logging(self): # type: () -> None
if self._log_file:
try:
name = self._log_file.name
self._log_file.close()
yellow_print('\nLogging is disabled and file {} has been closed'.format(name))
except Exception as e:
red_print('\nLog file cannot be closed: {}'.format(e))
finally:
self._log_file = None
def _print(self, string, console_printer=None): # type: (AnyStr, Optional[Callable]) -> None
if console_printer is None:
console_printer = self.console.write_bytes
if self._timestamps and (self._output_enabled or self._log_file):
t = datetime.datetime.now().strftime(self._timestamp_format)
# "string" is not guaranteed to be a full line. Timestamps should be only at the beginning of lines.
if isinstance(string, type(u'')):
search_patt = '\n'
replacement = '\n' + t + ' '
else:
search_patt = b'\n' # type: ignore
replacement = b'\n' + t.encode('ascii') + b' ' # type: ignore
string = string.replace(search_patt, replacement)
if self._output_enabled:
console_printer(string)
if self._log_file:
try:
if isinstance(string, type(u'')):
string = string.encode() # type: ignore
self._log_file.write(string) # type: ignore
except Exception as e:
red_print('\nCannot write to file: {}'.format(e))
# don't fill-up the screen with the previous errors (probably consequent prints would fail also)
self.stop_logging()
def handle_commands(self, cmd, chip): # type: (int, str) -> None
config = get_chip_config(chip)
reset_delay = config['reset']
@ -645,17 +467,18 @@ class Monitor(object):
time.sleep(reset_delay)
self.serial.setRTS(high)
self.serial.setDTR(self.serial.dtr) # usbser.sys workaround
self.output_enable(low)
self.logger.output_enabled = True
elif cmd == CMD_MAKE:
self.run_make('encrypted-flash' if self.encrypted else 'flash')
elif cmd == CMD_APP_FLASH:
self.run_make('encrypted-app-flash' if self.encrypted else 'app-flash')
elif cmd == CMD_OUTPUT_TOGGLE:
self.output_toggle()
self.logger.output_toggle()
elif cmd == CMD_TOGGLE_LOGGING:
self.toggle_logging()
self.logger.toggle_logging()
elif cmd == CMD_TOGGLE_TIMESTAMPS:
self.toggle_timestamps()
self.logger.toggle_timestamps()
self.logger.toggle_logging()
elif cmd == CMD_ENTER_BOOT:
self.serial.setDTR(high) # IO0=HIGH
self.serial.setRTS(low) # EN=LOW, chip in reset

View File

@ -1,5 +1,16 @@
import re
from serial.tools import miniterm
from .console_parser import ConsoleParser
from .constants import CMD_STOP, CTRL_T
from .output_helpers import red_print
try:
import queue # noqa
except ImportError:
import Queue as queue # type: ignore # noqa
# regex matches an potential PC value (0x4xxxxxxx)
MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
@ -33,3 +44,25 @@ PANIC_READING = 1
# panic handler decoding options
PANIC_DECODE_DISABLE = 'disable'
PANIC_DECODE_BACKTRACE = 'backtrace'
def prompt_next_action(reason, console, console_parser, event_queue, cmd_queue):
# type: (str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue) -> None
console.setup() # set up console to trap input characters
try:
red_print('--- {}'.format(reason))
red_print(console_parser.get_next_action_text())
k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
while k == CTRL_T:
k = console.getkey()
finally:
console.cleanup()
ret = console_parser.parse_next_action_key(k)
if ret is not None:
cmd = ret[1]
if cmd == CMD_STOP:
# the stop command should be handled last
event_queue.put(ret)
else:
cmd_queue.put(ret)

View File

@ -19,7 +19,7 @@ try:
except ImportError:
pass
import serial.tools.miniterm as miniterm
from serial.tools import miniterm
from .constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, CMD_STOP,
CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_A, CTRL_F, CTRL_H, CTRL_I, CTRL_L, CTRL_P,

View File

@ -0,0 +1,131 @@
import os
import re
import subprocess
import sys
import tempfile
from .output_helpers import Logger, normal_print, red_print, yellow_print
from .web_socket_client import WebSocketClient
class GDBHelper:
def __init__(self, toolchain_prefix, websocket_client, elf_file, port, baud_rate):
# type: (str, WebSocketClient, str, int, int) -> None
self._gdb_buffer = b'' # type: bytes
self._gdb_exit = False # type: bool
self.toolchain_prefix = toolchain_prefix
self.websocket_client = websocket_client
self.elf_file = elf_file
self.port = port
self.baud_rate = baud_rate
@property
def gdb_buffer(self): # type: () -> bytes
return self._gdb_buffer
@gdb_buffer.setter
def gdb_buffer(self, value): # type: (bytes) -> None
self._gdb_buffer = value
@property
def gdb_exit(self): # type: () -> bool
return self._gdb_exit
@gdb_exit.setter
def gdb_exit(self, value): # type: (bool) -> None
self._gdb_exit = value
def run_gdb(self):
# type: () -> None
normal_print('')
try:
cmd = ['%sgdb' % self.toolchain_prefix,
'-ex', 'set serial baud %d' % self.baud_rate,
'-ex', 'target remote %s' % self.port,
self.elf_file]
# Here we handling GDB as a process
# Open GDB process
try:
process = subprocess.Popen(cmd, cwd='.')
except KeyboardInterrupt:
pass
# We ignore Ctrl+C interrupt form external process abd wait response util GDB will be finished.
while True:
try:
process.wait()
break
except KeyboardInterrupt:
pass # We ignore the Ctrl+C
self.gdb_exit = True
except OSError as e:
red_print('%s: %s' % (' '.join(cmd), e))
except KeyboardInterrupt:
pass # happens on Windows, maybe other OSes
finally:
try:
# on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
process.terminate()
except Exception: # noqa
pass
try:
# also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
subprocess.call(['stty', 'sane'])
except Exception: # noqa
pass # don't care if there's no stty, we tried...
def check_gdb_stub_trigger(self, line):
# type: (bytes) -> bool
line = self.gdb_buffer + line
self.gdb_buffer = b''
m = re.search(b'\\$(T..)#(..)', line) # look for a gdb "reason" for a break
if m is not None:
try:
chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
calc_chsum = int(m.group(2), 16)
except ValueError: # payload wasn't valid hex digits
return False
if chsum == calc_chsum:
if self.websocket_client:
yellow_print('Communicating through WebSocket')
self.websocket_client.send({'event': 'gdb_stub',
'port': self.port,
'prog': self.elf_file})
yellow_print('Waiting for debug finished event')
self.websocket_client.wait([('event', 'debug_finished')])
yellow_print('Communications through WebSocket is finished')
else:
return True
else:
red_print('Malformed gdb message... calculated checksum %02x received %02x' % (chsum, calc_chsum))
return False
def process_panic_output(self, panic_output, logger, target): # type: (bytes, Logger, str) -> None
panic_output_decode_script = os.path.join(os.path.dirname(__file__), '..', 'gdb_panic_server.py')
panic_output_file = None
try:
# On Windows, the temporary file can't be read unless it is closed.
# Set delete=False and delete the file manually later.
with tempfile.NamedTemporaryFile(mode='wb', delete=False) as panic_output_file:
panic_output_file.write(panic_output)
panic_output_file.flush()
cmd = [self.toolchain_prefix + 'gdb',
'--batch', '-n',
self.elf_file,
'-ex', "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\""
.format(python=sys.executable,
script=panic_output_decode_script,
target=target,
output_file=panic_output_file.name),
'-ex', 'bt']
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
yellow_print('\nBacktrace:\n\n')
logger.print(output) # noqa: E999
except subprocess.CalledProcessError as e:
yellow_print('Failed to run gdb_panic_server.py script: {}\n{}\n\n'.format(e, e.output))
logger.print(panic_output)
finally:
if panic_output_file is not None:
try:
os.unlink(panic_output_file.name)
except OSError as e:
yellow_print('Couldn\'t remove temporary panic output file ({})'.format(e))

View File

@ -13,35 +13,137 @@
# limitations under the License.
import datetime
import os
import subprocess
import sys
from serial.tools import miniterm
try:
from typing import Optional
from typing import BinaryIO, Callable, Optional, Union # noqa
except ImportError:
pass
# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be updated)
ANSI_RED = '\033[1;31m'
ANSI_YELLOW = '\033[0;33m'
ANSI_NORMAL = '\033[0m'
def color_print(message, color, newline='\n'):
# type: (str, str, Optional[str]) -> None
def color_print(message, color, newline='\n'): # type: (str, str, Optional[str]) -> None
""" Print a message to stderr with colored highlighting """
sys.stderr.write('%s%s%s%s' % (color, message, ANSI_NORMAL, newline))
def normal_print(message):
# type: (str) -> None
def normal_print(message): # type: (str) -> None
sys.stderr.write(ANSI_NORMAL + message)
def yellow_print(message, newline='\n'):
# type: (str, Optional[str]) -> None
def yellow_print(message, newline='\n'): # type: (str, Optional[str]) -> None
color_print(message, ANSI_YELLOW, newline)
def red_print(message, newline='\n'):
# type: (str, Optional[str]) -> None
def red_print(message, newline='\n'): # type: (str, Optional[str]) -> None
color_print(message, ANSI_RED, newline)
def lookup_pc_address(pc_addr, toolchain_prefix, elf_file): # type: (str, str, str) -> Optional[str]
cmd = ['%saddr2line' % toolchain_prefix, '-pfiaC', '-e', elf_file, pc_addr]
try:
translation = subprocess.check_output(cmd, cwd='.')
if b'?? ??:0' not in translation:
return translation.decode()
except OSError as e:
red_print('%s: %s' % (' '.join(cmd), e))
return None
class Logger:
def __init__(self, elf_file, console, timestamps, timestamp_format):
# type: (str, miniterm.Console, bool, str) -> None
self.log_file = None # type: Optional[BinaryIO]
self._output_enabled = True # type: bool
self.elf_file = elf_file
self.console = console
self.timestamps = timestamps
self.timestamp_format = timestamp_format
@property
def output_enabled(self): # type: () -> bool
return self._output_enabled
@output_enabled.setter
def output_enabled(self, value): # type: (bool) -> None
self._output_enabled = value
@property
def log_file(self): # type: () -> Optional[BinaryIO]
return self._log_file
@log_file.setter
def log_file(self, value): # type: (Optional[BinaryIO]) -> None
self._log_file = value
def toggle_logging(self): # type: () -> None
if self._log_file:
self.stop_logging()
else:
self.start_logging()
def toggle_timestamps(self): # type: () -> None
self.timestamps = not self.timestamps
def start_logging(self): # type: () -> None
if not self._log_file:
name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0],
datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
try:
self.log_file = open(name, 'wb+')
yellow_print('\nLogging is enabled into file {}'.format(name))
except Exception as e:
red_print('\nLog file {} cannot be created: {}'.format(name, e))
def stop_logging(self): # type: () -> None
if self._log_file:
try:
name = self._log_file.name
self._log_file.close()
yellow_print('\nLogging is disabled and file {} has been closed'.format(name))
except Exception as e:
red_print('\nLog file cannot be closed: {}'.format(e))
finally:
self._log_file = None
def print(self, string, console_printer=None): # noqa: E999
# type: (Union[str, bytes], Optional[Callable]) -> None
if console_printer is None:
console_printer = self.console.write_bytes
if self.timestamps and (self._output_enabled or self._log_file):
t = datetime.datetime.now().strftime(self.timestamp_format)
# "string" is not guaranteed to be a full line. Timestamps should be only at the beginning of lines.
if isinstance(string, type(u'')):
search_patt = '\n'
replacement = '\n' + t + ' '
else:
search_patt = b'\n' # type: ignore
replacement = b'\n' + t.encode('ascii') + b' ' # type: ignore
string = string.replace(search_patt, replacement) # type: ignore
if self._output_enabled:
console_printer(string)
if self._log_file:
try:
if isinstance(string, type(u'')):
string = string.encode() # type: ignore
self._log_file.write(string) # type: ignore
except Exception as e:
red_print('\nCannot write to file: {}'.format(e))
# don't fill-up the screen with the previous errors (probably consequent prints would fail also)
self.stop_logging()
def output_toggle(self): # type: () -> None
self.output_enabled = not self.output_enabled
yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format(
self.output_enabled))