mirror of
https://github.com/espressif/esp-idf.git
synced 2024-10-05 20:47:46 -04:00
Merge branch 'feature/py3_idf_monitor' into 'master'
tools: Support Python3 in idf_monitor See merge request idf/esp-idf!2930
This commit is contained in:
commit
3adf41a5fe
@ -358,7 +358,12 @@ test_idf_monitor:
|
||||
expire_in: 1 week
|
||||
script:
|
||||
- cd ${IDF_PATH}/tools/test_idf_monitor
|
||||
- source /opt/pyenv/activate
|
||||
- pyenv global 2.7.15
|
||||
- ./run_test_idf_monitor.py
|
||||
- pyenv global 3.4.8
|
||||
- ./run_test_idf_monitor.py
|
||||
- pyenv global system
|
||||
|
||||
test_esp_err_to_name_on_host:
|
||||
<<: *host_test_template
|
||||
|
@ -28,6 +28,12 @@
|
||||
# Originally released under BSD-3-Clause license.
|
||||
#
|
||||
from __future__ import print_function, division
|
||||
from __future__ import unicode_literals
|
||||
from future import standard_library
|
||||
standard_library.install_aliases()
|
||||
from builtins import chr
|
||||
from builtins import object
|
||||
from builtins import bytes
|
||||
import subprocess
|
||||
import argparse
|
||||
import codecs
|
||||
@ -223,7 +229,7 @@ class SerialReader(StoppableThread):
|
||||
except:
|
||||
pass
|
||||
|
||||
class LineMatcher:
|
||||
class LineMatcher(object):
|
||||
"""
|
||||
Assembles a dictionary of filtering rules based on the --print_filter
|
||||
argument of idf_monitor. Then later it is used to match lines and
|
||||
@ -295,7 +301,7 @@ class Monitor(object):
|
||||
self.event_queue = queue.Queue()
|
||||
self.console = miniterm.Console()
|
||||
if os.name == 'nt':
|
||||
sys.stderr = ANSIColorConverter(sys.stderr)
|
||||
sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
|
||||
self.console.output = ANSIColorConverter(self.console.output)
|
||||
self.console.byte_output = ANSIColorConverter(self.console.byte_output)
|
||||
|
||||
@ -303,8 +309,8 @@ class Monitor(object):
|
||||
# Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above)
|
||||
def getkey_patched(self):
|
||||
c = self.enc_stdin.read(1)
|
||||
if c == unichr(0x7f):
|
||||
c = unichr(8) # map the BS key (which yields DEL) to backspace
|
||||
if c == chr(0x7f):
|
||||
c = chr(8) # map the BS key (which yields DEL) to backspace
|
||||
return c
|
||||
|
||||
self.console.getkey = types.MethodType(getkey_patched, self.console)
|
||||
@ -320,9 +326,9 @@ class Monitor(object):
|
||||
self.exit_key = CTRL_RBRACKET
|
||||
|
||||
self.translate_eol = {
|
||||
"CRLF": lambda c: c.replace(b"\n", b"\r\n"),
|
||||
"CR": lambda c: c.replace(b"\n", b"\r"),
|
||||
"LF": lambda c: c.replace(b"\r", b"\n"),
|
||||
"CRLF": lambda c: c.replace("\n", "\r\n"),
|
||||
"CR": lambda c: c.replace("\n", "\r"),
|
||||
"LF": lambda c: c.replace("\r", "\n"),
|
||||
}[eol]
|
||||
|
||||
# internal state
|
||||
@ -404,9 +410,9 @@ class Monitor(object):
|
||||
self._last_line_part = sp.pop()
|
||||
for line in sp:
|
||||
if line != b"":
|
||||
if self._serial_check_exit and line == self.exit_key:
|
||||
if self._serial_check_exit and line == self.exit_key.encode('latin-1'):
|
||||
raise SerialStopException()
|
||||
if self._output_enabled and (self._force_line_print or self._line_matcher.match(line)):
|
||||
if self._output_enabled and (self._force_line_print or self._line_matcher.match(line.decode(errors="ignore"))):
|
||||
self.console.write_bytes(line + b'\n')
|
||||
self.handle_possible_pc_address_in_line(line)
|
||||
self.check_gdbstub_trigger(line)
|
||||
@ -416,7 +422,7 @@ class Monitor(object):
|
||||
# of the line. But after some time when we didn't received it we need
|
||||
# to make a decision.
|
||||
if self._last_line_part != b"":
|
||||
if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part)):
|
||||
if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors="ignore"))):
|
||||
self._force_line_print = True;
|
||||
if self._output_enabled:
|
||||
self.console.write_bytes(self._last_line_part)
|
||||
@ -438,7 +444,7 @@ class Monitor(object):
|
||||
def handle_possible_pc_address_in_line(self, line):
|
||||
line = self._pc_address_buffer + line
|
||||
self._pc_address_buffer = b""
|
||||
for m in re.finditer(MATCH_PCADDR, line):
|
||||
for m in re.finditer(MATCH_PCADDR, line.decode(errors="ignore")):
|
||||
self.lookup_pc_address(m.group())
|
||||
|
||||
def handle_menu_key(self, c):
|
||||
@ -547,8 +553,8 @@ class Monitor(object):
|
||||
["%saddr2line" % self.toolchain_prefix,
|
||||
"-pfiaC", "-e", self.elf_file, pc_addr],
|
||||
cwd=".")
|
||||
if not "?? ??:0" in translation:
|
||||
yellow_print(translation)
|
||||
if not b"?? ??:0" in translation:
|
||||
yellow_print(translation.decode())
|
||||
|
||||
def check_gdbstub_trigger(self, line):
|
||||
line = self._gdb_buffer + line
|
||||
@ -556,7 +562,7 @@ class Monitor(object):
|
||||
m = re.search(b"\\$(T..)#(..)", line) # look for a gdb "reason" for a break
|
||||
if m is not None:
|
||||
try:
|
||||
chsum = sum(ord(p) for p in m.group(1)) & 0xFF
|
||||
chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
|
||||
calc_chsum = int(m.group(2), 16)
|
||||
except ValueError:
|
||||
return # payload wasn't valid hex digits
|
||||
@ -708,14 +714,18 @@ if os.name == 'nt':
|
||||
least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output.
|
||||
"""
|
||||
|
||||
def __init__(self, output):
|
||||
def __init__(self, output=None, decode_output=False):
|
||||
self.output = output
|
||||
self.decode_output = decode_output
|
||||
self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE)
|
||||
self.matched = b''
|
||||
|
||||
def _output_write(self, data):
|
||||
try:
|
||||
self.output.write(data)
|
||||
if self.decode_output:
|
||||
self.output.write(data.decode())
|
||||
else:
|
||||
self.output.write(data)
|
||||
except IOError:
|
||||
# Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws
|
||||
# an exception (however, the character is still written to the screen)
|
||||
@ -723,13 +733,18 @@ if os.name == 'nt':
|
||||
pass
|
||||
|
||||
def write(self, data):
|
||||
if type(data) is not bytes:
|
||||
data = data.encode('latin-1')
|
||||
for b in data:
|
||||
b = bytes([b])
|
||||
l = len(self.matched)
|
||||
if b == '\033': # ESC
|
||||
if b == b'\033': # ESC
|
||||
self.matched = b
|
||||
elif (l == 1 and b == '[') or (1 < l < 7):
|
||||
elif (l == 1 and b == b'[') or (1 < l < 7):
|
||||
self.matched += b
|
||||
if self.matched == ANSI_NORMAL: # reset console
|
||||
if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console
|
||||
# Flush is required only with Python3 - switching color before it is printed would mess up the console
|
||||
self.flush()
|
||||
SetConsoleTextAttribute(self.handle, FOREGROUND_GREY)
|
||||
self.matched = b''
|
||||
elif len(self.matched) == 7: # could be an ANSI sequence
|
||||
@ -738,6 +753,8 @@ if os.name == 'nt':
|
||||
color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))]
|
||||
if m.group(1) == b'1':
|
||||
color |= FOREGROUND_INTENSITY
|
||||
# Flush is required only with Python3 - switching color before it is printed would mess up the console
|
||||
self.flush()
|
||||
SetConsoleTextAttribute(self.handle, color)
|
||||
else:
|
||||
self._output_write(self.matched) # not an ANSI color code, display verbatim
|
||||
@ -749,6 +766,5 @@ if os.name == 'nt':
|
||||
def flush(self):
|
||||
self.output.flush()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
@ -14,6 +14,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
from builtins import object
|
||||
from io import open
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
@ -34,11 +38,11 @@ test_list = (
|
||||
in_dir = 'tests/' # tests are in this directory
|
||||
out_dir = 'outputs/' # test results are written to this directory (kept only for debugging purposes)
|
||||
socat_in = './socatfile'# temporary socat file (deleted after run)
|
||||
monitor_error_output = out_dir + 'monitor_error_output'
|
||||
err_out = out_dir + 'monitor_error_output'
|
||||
elf_file = './dummy.elf' # ELF file used for starting the monitor
|
||||
idf_monitor = '{}/tools/idf_monitor.py'.format(os.getenv("IDF_PATH"))
|
||||
|
||||
class SocatRunner:
|
||||
class SocatRunner(object):
|
||||
"""
|
||||
Runs socat in the background for creating a socket.
|
||||
"""
|
||||
@ -54,7 +58,7 @@ class SocatRunner:
|
||||
'-U', # unidirectional pipe from file to port
|
||||
'TCP4-LISTEN:2399,reuseaddr,fork',
|
||||
'exec:"tail -c 1GB -F ' + socat_in + '"']
|
||||
print ' '.join(socat_cmd)
|
||||
print(' '.join(socat_cmd))
|
||||
self._socat_process = subprocess.Popen(socat_cmd, preexec_fn=os.setsid) # See __exit__ for os.setsid
|
||||
return self
|
||||
|
||||
@ -82,38 +86,38 @@ def main():
|
||||
# another reason while the temporary socat_in file is used instead of directly reading the test files).
|
||||
time.sleep(1)
|
||||
for t in test_list:
|
||||
print 'Running test on {} with filter "{}" and expecting {}'.format(t[0], t[1], t[2])
|
||||
with open(in_dir + t[0], "r") as input_file, open(socat_in, "w") as socat_file:
|
||||
print 'cat {} > {}'.format(input_file.name, socat_file.name)
|
||||
for line in input_file:
|
||||
socat_file.write(line)
|
||||
print('Running test on {} with filter "{}" and expecting {}'.format(t[0], t[1], t[2]))
|
||||
with open(in_dir + t[0], "r", encoding='utf-8') as i_f, open(socat_in, "w", encoding='utf-8') as s_f:
|
||||
print('cat {} > {}'.format(i_f.name, s_f.name))
|
||||
for line in i_f:
|
||||
s_f.write(line)
|
||||
idf_exit_sequence = b'\x1d\n'
|
||||
print 'echo "<exit>" >> {}'.format(socat_file.name)
|
||||
socat_file.write(idf_exit_sequence)
|
||||
print('echo "<exit>" >> {}'.format(s_f.name))
|
||||
s_f.write(idf_exit_sequence.decode())
|
||||
monitor_cmd = [idf_monitor,
|
||||
'--port', 'socket://localhost:2399',
|
||||
'--print_filter', t[1],
|
||||
elf_file]
|
||||
with open(out_dir + t[2], "w") as mon_out_f, open(monitor_error_output, "w") as mon_err_f:
|
||||
with open(out_dir + t[2], "w", encoding='utf-8') as o_f, open(err_out, "w", encoding='utf-8') as e_f:
|
||||
try:
|
||||
(master_fd, slave_fd) = pty.openpty()
|
||||
print ' '.join(monitor_cmd),
|
||||
print ' > {} 2> {} < {}'.format(mon_out_f.name, mon_err_f.name, os.ttyname(slave_fd))
|
||||
proc = subprocess.Popen(monitor_cmd, stdin=slave_fd, stdout=mon_out_f, stderr=mon_err_f,
|
||||
print(' '.join(monitor_cmd), end=' ')
|
||||
print(' > {} 2> {} < {}'.format(o_f.name, e_f.name, os.ttyname(slave_fd)))
|
||||
proc = subprocess.Popen(monitor_cmd, stdin=slave_fd, stdout=o_f, stderr=e_f,
|
||||
close_fds=True)
|
||||
proc.wait()
|
||||
finally:
|
||||
os.close(slave_fd)
|
||||
os.close(master_fd)
|
||||
diff_cmd = ['diff', in_dir + t[2], out_dir + t[2]]
|
||||
print ' '.join(diff_cmd)
|
||||
print(' '.join(diff_cmd))
|
||||
subprocess.check_call(diff_cmd)
|
||||
print 'Test has passed'
|
||||
print('Test has passed')
|
||||
finally:
|
||||
cleanup()
|
||||
|
||||
end = time.time()
|
||||
print 'Execution took {:.2f} seconds'.format(end - start)
|
||||
print('Execution took {:.2f} seconds'.format(end - start))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
Loading…
Reference in New Issue
Block a user