esp-idf/tools/idf_monitor_base/console_reader.py

77 lines
3.0 KiB
Python

# SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import os
import queue
import time
from serial.tools.miniterm import Console
from .console_parser import ConsoleParser
from .constants import CMD_STOP, TAG_CMD
from .stoppable_thread import StoppableThread
class ConsoleReader(StoppableThread):
""" Read input keys from the console and push them to the queue,
until stopped.
"""
def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
# type: (Console, queue.Queue, queue.Queue, ConsoleParser, bool) -> None
super(ConsoleReader, self).__init__()
self.console = console
self.event_queue = event_queue
self.cmd_queue = cmd_queue
self.parser = parser
self.test_mode = test_mode
def run(self):
# type: () -> None
self.console.setup()
if os.name == 'posix':
# Use non-blocking busy read to avoid using unsecure TIOCSTI from console.cancel().
# TIOCSTI is not supported on kernels newer than 6.2.
import termios
new = termios.tcgetattr(self.console.fd)
# new[6] - 'cc': a list of the tty special characters
new[6][termios.VMIN] = 0 # minimum bytes to read
new[6][termios.VTIME] = 2 # timer of 0.1 second granularity
termios.tcsetattr(self.console.fd, termios.TCSANOW, new)
try:
while self.alive:
try:
if os.name == 'nt':
# Windows kludge: because the console.cancel() method doesn't
# seem to work to unblock getkey() on the Windows implementation.
#
# So we only call getkey() if we know there's a key waiting for us.
import msvcrt
while not msvcrt.kbhit() and self.alive: # type: ignore
time.sleep(0.1)
if not self.alive:
break
elif self.test_mode:
# In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
# the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
# Therefore, we avoid calling it.
while self.alive:
time.sleep(0.1)
break
c = self.console.getkey()
except KeyboardInterrupt:
c = '\x03'
if c:
ret = self.parser.parse(c)
if ret is not None:
(tag, cmd) = ret
# stop command should be executed last
if tag == TAG_CMD and cmd != CMD_STOP:
self.cmd_queue.put(ret)
else:
self.event_queue.put(ret)
finally:
self.console.cleanup()