Moved coredump utils out of idf_monitor

This commit is contained in:
Martin Gaňo 2021-05-14 11:20:38 +02:00
parent d5845abe62
commit 5874fceb0d
7 changed files with 196 additions and 176 deletions

View File

@ -46,17 +46,16 @@ try:
except ImportError:
pass
from idf_monitor_base import (COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO, COREDUMP_DONE, COREDUMP_IDLE,
COREDUMP_READING, COREDUMP_UART_END, COREDUMP_UART_PROMPT, COREDUMP_UART_START,
DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, MATCH_PCADDR, PANIC_DECODE_BACKTRACE,
PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE, PANIC_READING, PANIC_STACK_DUMP,
PANIC_START, prompt_next_action)
from idf_monitor_base.chip_specific_config import get_chip_config
from idf_monitor_base.console_parser import ConsoleParser
from idf_monitor_base.console_parser import ConsoleParser, prompt_next_action
from idf_monitor_base.console_reader import ConsoleReader
from idf_monitor_base.constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET,
CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_H, TAG_CMD, TAG_KEY,
TAG_SERIAL, TAG_SERIAL_FLUSH)
CMD_STOP, CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, CTRL_H,
DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, MATCH_PCADDR,
PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE,
PANIC_READING, PANIC_STACK_DUMP, PANIC_START, TAG_CMD, TAG_KEY, TAG_SERIAL,
TAG_SERIAL_FLUSH)
from idf_monitor_base.coredump import COREDUMP_DECODE_DISABLE, COREDUMP_DECODE_INFO, CoreDump
from idf_monitor_base.exceptions import SerialStopException
from idf_monitor_base.gdbhelper import GDBHelper
from idf_monitor_base.line_matcher import LineMatcher
@ -72,7 +71,6 @@ except ImportError:
import shlex
import sys
import tempfile
import serial
import serial.tools.list_ports
@ -145,16 +143,15 @@ class Monitor(object):
self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer]
self._force_line_print = False
self._serial_check_exit = socket_mode
self._decode_coredumps = decode_coredumps
self._reading_coredump = COREDUMP_IDLE
self._coredump_buffer = b''
self._decode_panic = decode_panic
self._reading_panic = PANIC_IDLE
self._panic_buffer = b''
self.start_cmd_sent = False
self.gdb_helper = GDBHelper(self.toolchain_prefix, self.websocket_client, self.elf_file, self.serial.port,
self.serial.baudrate)
self.logger = Logger(self.elf_file, self.console, timestamps, timestamp_format)
self.coredump = CoreDump(decode_coredumps, self.event_queue, self.logger, self.websocket_client, self.elf_file)
def invoke_processing_last_line(self):
# type: () -> None
@ -208,7 +205,7 @@ class Monitor(object):
self._invoke_processing_last_line_timer = threading.Timer(0.1,
self.invoke_processing_last_line)
self._invoke_processing_last_line_timer.start()
# If no futher data is received in the next short period
# If no further data is received in the next short period
# of time then the _invoke_processing_last_line_timer
# generates an event which will result in the finishing of
# the last line. This is fix for handling lines sent
@ -269,11 +266,10 @@ class Monitor(object):
if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'):
raise SerialStopException()
self.check_panic_decode_trigger(line)
self.check_coredump_trigger_before_print(line)
if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')):
self.logger.print(line + b'\n')
self.handle_possible_pc_address_in_line(line)
self.check_coredump_trigger_after_print()
with self.coredump.check(line):
if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')):
self.logger.print(line + b'\n')
self.handle_possible_pc_address_in_line(line)
self.check_gdb_stub_and_run(line)
self._force_line_print = False
# Now we have the last part (incomplete line) in _last_line_part. By
@ -342,93 +338,6 @@ class Monitor(object):
else:
self.logger.output_enabled = True
def check_coredump_trigger_before_print(self, line): # type: (bytes) -> None
if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
return
if COREDUMP_UART_PROMPT in line:
yellow_print('Initiating core dump!')
self.event_queue.put((TAG_KEY, '\n'))
return
if COREDUMP_UART_START in line:
yellow_print('Core dump started (further output muted)')
self._reading_coredump = COREDUMP_READING
self._coredump_buffer = b''
self.logger.output_enabled = False
return
if COREDUMP_UART_END in line:
self._reading_coredump = COREDUMP_DONE
yellow_print('\nCore dump finished!')
self.process_coredump()
return
if self._reading_coredump == COREDUMP_READING:
kb = 1024
buffer_len_kb = len(self._coredump_buffer) // kb
self._coredump_buffer += line.replace(b'\r', b'') + b'\n'
new_buffer_len_kb = len(self._coredump_buffer) // kb
if new_buffer_len_kb > buffer_len_kb:
yellow_print('Received %3d kB...' % new_buffer_len_kb, newline='\r')
def check_coredump_trigger_after_print(self): # type: () -> None
if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
return
# Re-enable output after the last line of core dump has been consumed
if not self.logger.output_enabled and self._reading_coredump == COREDUMP_DONE:
self._reading_coredump = COREDUMP_IDLE
self.logger.output_enabled = True
self._coredump_buffer = b''
def process_coredump(self): # type: () -> None
if self._decode_coredumps != COREDUMP_DECODE_INFO:
raise NotImplementedError('process_coredump: %s not implemented' % self._decode_coredumps)
coredump_script = os.path.join(os.path.dirname(__file__), '..', 'components', 'espcoredump', 'espcoredump.py')
coredump_file = None
try:
# On Windows, the temporary file can't be read unless it is closed.
# Set delete=False and delete the file manually later.
with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file:
coredump_file.write(self._coredump_buffer)
coredump_file.flush()
if self.websocket_client:
self.logger.output_enabled = True
yellow_print('Communicating through WebSocket')
self.websocket_client.send({'event': 'coredump',
'file': coredump_file.name,
'prog': self.elf_file})
yellow_print('Waiting for debug finished event')
self.websocket_client.wait([('event', 'debug_finished')])
yellow_print('Communications through WebSocket is finished')
else:
cmd = [sys.executable,
coredump_script,
'info_corefile',
'--core', coredump_file.name,
'--core-format', 'b64',
self.elf_file
]
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
self.logger.output_enabled = True
self.logger.print(output)
self.logger.output_enabled = False # Will be reenabled in check_coredump_trigger_after_print
except subprocess.CalledProcessError as e:
yellow_print('Failed to run espcoredump script: {}\n{}\n\n'.format(e, e.output))
self.logger.output_enabled = True
self.logger.print(COREDUMP_UART_START + b'\n')
self.logger.print(self._coredump_buffer)
# end line will be printed in handle_serial_input
finally:
if coredump_file is not None:
try:
os.unlink(coredump_file.name)
except OSError as e:
yellow_print('Couldn\'t remote temporary core dump file ({})'.format(e))
def check_panic_decode_trigger(self, line): # type: (bytes) -> None
if self._decode_panic == PANIC_DECODE_DISABLE:
return

View File

@ -1,68 +0,0 @@
import re
from serial.tools import miniterm
from .console_parser import ConsoleParser
from .constants import CMD_STOP, CTRL_T
from .output_helpers import red_print
try:
import queue # noqa
except ImportError:
import Queue as queue # type: ignore # noqa
# regex matches an potential PC value (0x4xxxxxxx)
MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
DEFAULT_TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'
DEFAULT_PRINT_FILTER = ''
# coredump related messages
COREDUMP_UART_START = b'================= CORE DUMP START ================='
COREDUMP_UART_END = b'================= CORE DUMP END ================='
COREDUMP_UART_PROMPT = b'Press Enter to print core dump to UART...'
# coredump states
COREDUMP_IDLE = 0
COREDUMP_READING = 1
COREDUMP_DONE = 2
# coredump decoding options
COREDUMP_DECODE_DISABLE = 'disable'
COREDUMP_DECODE_INFO = 'info'
# panic handler related messages
PANIC_START = r'Core \s*\d+ register dump:'
PANIC_END = b'ELF file SHA256:'
PANIC_STACK_DUMP = b'Stack memory:'
# panic handler decoding states
PANIC_IDLE = 0
PANIC_READING = 1
# panic handler decoding options
PANIC_DECODE_DISABLE = 'disable'
PANIC_DECODE_BACKTRACE = 'backtrace'
def prompt_next_action(reason, console, console_parser, event_queue, cmd_queue):
# type: (str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue) -> None
console.setup() # set up console to trap input characters
try:
red_print('--- {}'.format(reason))
red_print(console_parser.get_next_action_text())
k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
while k == CTRL_T:
k = console.getkey()
finally:
console.cleanup()
ret = console_parser.parse_next_action_key(k)
if ret is not None:
cmd = ret[1]
if cmd == CMD_STOP:
# the stop command should be handled last
event_queue.put(ret)
else:
cmd_queue.put(ret)

View File

@ -19,6 +19,9 @@ try:
except ImportError:
pass
import queue
from serial.tools import miniterm
from .constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, CMD_STOP,
@ -29,6 +32,28 @@ from .output_helpers import red_print, yellow_print
key_description = miniterm.key_description
def prompt_next_action(reason, console, console_parser, event_queue, cmd_queue):
# type: (str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue) -> None
console.setup() # set up console to trap input characters
try:
red_print('--- {}'.format(reason))
red_print(console_parser.get_next_action_text())
k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
while k == CTRL_T:
k = console.getkey()
finally:
console.cleanup()
ret = console_parser.parse_next_action_key(k)
if ret is not None:
cmd = ret[1]
if cmd == CMD_STOP:
# the stop command should be handled last
event_queue.put(ret)
else:
cmd_queue.put(ret)
class ConsoleParser(object):
def __init__(self, eol='CRLF'): # type: (str) -> None

View File

@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
# Control-key characters
CTRL_A = '\x01'
CTRL_B = '\x02'
@ -43,3 +46,28 @@ TAG_SERIAL_FLUSH = 2
TAG_CMD = 3
__version__ = '1.1'
# paths to scripts
PANIC_OUTPUT_DECODE_SCRIPT = os.path.join(os.path.dirname(__file__), '..', 'gdb_panic_server.py')
COREDUMP_SCRIPT = os.path.join(os.path.dirname(__file__), '..', '..', 'components', 'espcoredump', 'espcoredump.py')
# regex matches an potential PC value (0x4xxxxxxx)
MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
DEFAULT_TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'
DEFAULT_PRINT_FILTER = ''
# panic handler related messages
PANIC_START = r'Core \s*\d+ register dump:'
PANIC_END = b'ELF file SHA256:'
PANIC_STACK_DUMP = b'Stack memory:'
# panic handler decoding states
PANIC_IDLE = 0
PANIC_READING = 1
# panic handler decoding options
PANIC_DECODE_DISABLE = 'disable'
PANIC_DECODE_BACKTRACE = 'backtrace'

View File

@ -0,0 +1,127 @@
import os
import queue
import subprocess
import sys
import tempfile
from contextlib import contextmanager
from typing import Generator
from .constants import COREDUMP_SCRIPT, TAG_KEY
from .output_helpers import Logger, yellow_print
from .web_socket_client import WebSocketClient
# coredump related messages
COREDUMP_UART_START = b'================= CORE DUMP START ================='
COREDUMP_UART_END = b'================= CORE DUMP END ================='
COREDUMP_UART_PROMPT = b'Press Enter to print core dump to UART...'
# coredump states
COREDUMP_IDLE = 0
COREDUMP_READING = 1
COREDUMP_DONE = 2
# coredump decoding options
COREDUMP_DECODE_DISABLE = 'disable'
COREDUMP_DECODE_INFO = 'info'
class CoreDump:
def __init__(self, decode_coredumps, event_queue, logger, websocket_client, elf_file):
# type: (str, queue.Queue, Logger, WebSocketClient, str) -> None
self._coredump_buffer = b''
self._decode_coredumps = decode_coredumps
self.event_queue = event_queue
self._reading_coredump = COREDUMP_IDLE
self.logger = logger
self.websocket_client = websocket_client
self.elf_file = elf_file
def _process_coredump(self): # type: () -> None
if self._decode_coredumps != COREDUMP_DECODE_INFO:
raise NotImplementedError('process_coredump: %s not implemented' % self._decode_coredumps)
coredump_file = None
try:
# On Windows, the temporary file can't be read unless it is closed.
# Set delete=False and delete the file manually later.
with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file:
coredump_file.write(self._coredump_buffer)
coredump_file.flush()
if self.websocket_client:
self.logger.output_enabled = True
yellow_print('Communicating through WebSocket')
self.websocket_client.send({'event': 'coredump',
'file': coredump_file.name,
'prog': self.elf_file})
yellow_print('Waiting for debug finished event')
self.websocket_client.wait([('event', 'debug_finished')])
yellow_print('Communications through WebSocket is finished')
else:
cmd = [sys.executable,
COREDUMP_SCRIPT,
'info_corefile',
'--core', coredump_file.name,
'--core-format', 'b64',
self.elf_file
]
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
self.logger.output_enabled = True
self.logger.print(output) # noqa: E999
self.logger.output_enabled = False # Will be reenabled in check_coredump_trigger_after_print
except subprocess.CalledProcessError as e:
yellow_print('Failed to run espcoredump script: {}\n{}\n\n'.format(e, e.output))
self.logger.output_enabled = True
self.logger.print(COREDUMP_UART_START + b'\n')
self.logger.print(self._coredump_buffer)
# end line will be printed in handle_serial_input
finally:
if coredump_file is not None:
try:
os.unlink(coredump_file.name)
except OSError as e:
yellow_print('Couldn\'t remote temporary core dump file ({})'.format(e))
def _check_coredump_trigger_before_print(self, line): # type: (bytes) -> None
if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
return
if COREDUMP_UART_PROMPT in line:
yellow_print('Initiating core dump!')
self.event_queue.put((TAG_KEY, '\n'))
return
if COREDUMP_UART_START in line:
yellow_print('Core dump started (further output muted)')
self._reading_coredump = COREDUMP_READING
self._coredump_buffer = b''
self.logger.output_enabled = False
return
if COREDUMP_UART_END in line:
self._reading_coredump = COREDUMP_DONE
yellow_print('\nCore dump finished!')
self._process_coredump()
return
if self._reading_coredump == COREDUMP_READING:
kb = 1024
buffer_len_kb = len(self._coredump_buffer) // kb
self._coredump_buffer += line.replace(b'\r', b'') + b'\n'
new_buffer_len_kb = len(self._coredump_buffer) // kb
if new_buffer_len_kb > buffer_len_kb:
yellow_print('Received %3d kB...' % new_buffer_len_kb, newline='\r')
def _check_coredump_trigger_after_print(self): # type: () -> None
if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
return
# Re-enable output after the last line of core dump has been consumed
if not self.logger.output_enabled and self._reading_coredump == COREDUMP_DONE:
self._reading_coredump = COREDUMP_IDLE
self.logger.output_enabled = True
self._coredump_buffer = b''
@contextmanager
def check(self, line): # type: (bytes) -> Generator
self._check_coredump_trigger_before_print(line)
try:
yield
finally:
self._check_coredump_trigger_after_print()

View File

@ -4,6 +4,7 @@ import subprocess
import sys
import tempfile
from .constants import PANIC_OUTPUT_DECODE_SCRIPT
from .output_helpers import Logger, normal_print, red_print, yellow_print
from .web_socket_client import WebSocketClient
@ -100,7 +101,6 @@ class GDBHelper:
return False
def process_panic_output(self, panic_output, logger, target): # type: (bytes, Logger, str) -> None
panic_output_decode_script = os.path.join(os.path.dirname(__file__), '..', 'gdb_panic_server.py')
panic_output_file = None
try:
# On Windows, the temporary file can't be read unless it is closed.
@ -113,7 +113,7 @@ class GDBHelper:
self.elf_file,
'-ex', "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\""
.format(python=sys.executable,
script=panic_output_decode_script,
script=PANIC_OUTPUT_DECODE_SCRIPT,
target=target,
output_file=panic_output_file.name),
'-ex', 'bt']

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os
import subprocess